feat(trajectory): 新增轨迹事件消费与落库模型
- 新增 ops_device_trajectory 表及轨迹数据对象、Mapper\n- 消费 trajectory-enter / trajectory-leave 事件并做幂等处理\n- 落地设备进入/离开区域记录,补充停留时长与离开原因字段\n- 在服务层封装轨迹写入、关闭未离场记录等核心逻辑
This commit is contained in:
37
sql/mysql/ops_device_trajectory.sql
Normal file
37
sql/mysql/ops_device_trajectory.sql
Normal file
@@ -0,0 +1,37 @@
|
||||
-- =============================================
|
||||
-- 设备轨迹记录表
|
||||
-- 记录工牌设备进出各区域的轨迹(进入时创建,离开时更新)
|
||||
-- =============================================
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `ops_device_trajectory` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`device_id` BIGINT NOT NULL COMMENT '工牌设备ID',
|
||||
`device_name` VARCHAR(64) DEFAULT NULL COMMENT '设备名称(冗余)',
|
||||
`nickname` VARCHAR(64) DEFAULT NULL COMMENT '设备备注名称(冗余)',
|
||||
`person_id` BIGINT DEFAULT NULL COMMENT '人员ID(预留)',
|
||||
`person_name` VARCHAR(64) DEFAULT NULL COMMENT '人员名称(预留)',
|
||||
`area_id` BIGINT NOT NULL COMMENT '区域ID',
|
||||
`area_name` VARCHAR(128) DEFAULT NULL COMMENT '区域名称(冗余)',
|
||||
`beacon_mac` VARCHAR(64) DEFAULT NULL COMMENT '匹配的Beacon MAC',
|
||||
`enter_time` DATETIME NOT NULL COMMENT '进入时间',
|
||||
`leave_time` DATETIME DEFAULT NULL COMMENT '离开时间',
|
||||
`duration_seconds` INT DEFAULT NULL COMMENT '停留时长(秒)',
|
||||
`leave_reason` VARCHAR(32) DEFAULT NULL COMMENT '离开原因: SIGNAL_LOSS/AREA_SWITCH/DEVICE_OFFLINE',
|
||||
`enter_rssi` INT DEFAULT NULL COMMENT '进入时RSSI',
|
||||
`tenant_id` BIGINT NOT NULL DEFAULT 0 COMMENT '租户ID',
|
||||
`creator` VARCHAR(64) DEFAULT '' COMMENT '创建者',
|
||||
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`updater` VARCHAR(64) DEFAULT '' COMMENT '更新者',
|
||||
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
|
||||
`deleted` BIT(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',
|
||||
PRIMARY KEY (`id`),
|
||||
INDEX `idx_device_enter` (`device_id`, `enter_time`),
|
||||
INDEX `idx_area_enter` (`area_id`, `enter_time`),
|
||||
INDEX `idx_device_area` (`device_id`, `area_id`),
|
||||
INDEX `idx_tenant` (`tenant_id`),
|
||||
INDEX `idx_device_area_leave` (`device_id`, `area_id`, `leave_time`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='设备轨迹记录';
|
||||
|
||||
-- 优化:为 selectOpenRecord 查询(device_id + area_id + leave_time IS NULL)添加索引
|
||||
-- 注:已有数据的线上环境需单独执行以下 ALTER(如数据量大请在低峰期执行):
|
||||
-- ALTER TABLE `ops_device_trajectory` ADD INDEX `idx_device_area_leave` (`device_id`, `area_id`, `leave_time`);
|
||||
@@ -0,0 +1,89 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.ops.environment.integration.dto.TrajectoryEnterEventDTO;
|
||||
import com.viewsh.module.ops.environment.service.trajectory.DeviceTrajectoryService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeParseException;
|
||||
|
||||
/**
|
||||
* 轨迹进入区域事件消费者
|
||||
* <p>
|
||||
* 订阅 IoT 模块发布的轨迹进入事件,创建轨迹记录
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "trajectory-enter",
|
||||
consumerGroup = "ops-trajectory-enter-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class TrajectoryEnterEventHandler implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private IntegrationEventDeduplicationService deduplicationService;
|
||||
|
||||
@Resource
|
||||
private DeviceTrajectoryService trajectoryService;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
TrajectoryEnterEventDTO event = objectMapper.readValue(message, TrajectoryEnterEventDTO.class);
|
||||
|
||||
// 幂等性检查
|
||||
if (!deduplicationService.tryConsume(event.getEventId())) {
|
||||
log.debug("[TrajectoryEnterHandler] 重复消息,跳过:eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[TrajectoryEnterHandler] 收到进入事件:eventId={}, deviceId={}, areaId={}",
|
||||
event.getEventId(), event.getDeviceId(), event.getAreaId());
|
||||
|
||||
// 解析事件时间
|
||||
LocalDateTime enterTime = parseEventTime(event.getEventTime());
|
||||
|
||||
// 创建轨迹记录
|
||||
trajectoryService.recordEnter(
|
||||
event.getDeviceId(),
|
||||
event.getDeviceName(),
|
||||
event.getNickname(),
|
||||
event.getAreaId(),
|
||||
event.getBeaconMac(),
|
||||
event.getEnterRssi(),
|
||||
enterTime);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[TrajectoryEnterHandler] 消息处理失败:message={}", message, e);
|
||||
throw new RuntimeException("轨迹进入事件处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private LocalDateTime parseEventTime(String eventTime) {
|
||||
if (eventTime == null || eventTime.isEmpty()) {
|
||||
return LocalDateTime.now();
|
||||
}
|
||||
try {
|
||||
return LocalDateTime.parse(eventTime, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
||||
} catch (DateTimeParseException e) {
|
||||
log.warn("[TrajectoryEnterHandler] 事件时间解析失败,使用当前时间:eventTime={}", eventTime, e);
|
||||
return LocalDateTime.now();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.ops.environment.integration.dto.TrajectoryLeaveEventDTO;
|
||||
import com.viewsh.module.ops.environment.service.trajectory.DeviceTrajectoryService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeParseException;
|
||||
|
||||
/**
|
||||
* 轨迹离开区域事件消费者
|
||||
* <p>
|
||||
* 订阅 IoT 模块发布的轨迹离开事件,更新轨迹记录的离开信息
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "trajectory-leave",
|
||||
consumerGroup = "ops-trajectory-leave-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class TrajectoryLeaveEventHandler implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private IntegrationEventDeduplicationService deduplicationService;
|
||||
|
||||
@Resource
|
||||
private DeviceTrajectoryService trajectoryService;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
TrajectoryLeaveEventDTO event = objectMapper.readValue(message, TrajectoryLeaveEventDTO.class);
|
||||
|
||||
// 幂等性检查
|
||||
if (!deduplicationService.tryConsume(event.getEventId())) {
|
||||
log.debug("[TrajectoryLeaveHandler] 重复消息,跳过:eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[TrajectoryLeaveHandler] 收到离开事件:eventId={}, deviceId={}, areaId={}, reason={}",
|
||||
event.getEventId(), event.getDeviceId(), event.getAreaId(), event.getLeaveReason());
|
||||
|
||||
// 解析事件时间
|
||||
LocalDateTime leaveTime = parseEventTime(event.getEventTime());
|
||||
|
||||
// 更新轨迹记录
|
||||
trajectoryService.recordLeave(
|
||||
event.getDeviceId(),
|
||||
event.getAreaId(),
|
||||
event.getLeaveReason(),
|
||||
event.getEnterTimestamp(),
|
||||
leaveTime);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[TrajectoryLeaveHandler] 消息处理失败:message={}", message, e);
|
||||
throw new RuntimeException("轨迹离开事件处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private LocalDateTime parseEventTime(String eventTime) {
|
||||
if (eventTime == null || eventTime.isEmpty()) {
|
||||
return LocalDateTime.now();
|
||||
}
|
||||
try {
|
||||
return LocalDateTime.parse(eventTime, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
||||
} catch (DateTimeParseException e) {
|
||||
log.warn("[TrajectoryLeaveHandler] 事件时间解析失败,使用当前时间:eventTime={}", eventTime, e);
|
||||
return LocalDateTime.now();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package com.viewsh.module.ops.environment.integration.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 轨迹进入区域事件 DTO
|
||||
* <p>
|
||||
* 由 IoT 模块发布,Ops 模块消费
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class TrajectoryEnterEventDTO {
|
||||
|
||||
/**
|
||||
* 事件ID(UUID,用于幂等性控制)
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* 设备ID(工牌)
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 设备备注名称
|
||||
*/
|
||||
private String nickname;
|
||||
|
||||
/**
|
||||
* 区域ID
|
||||
*/
|
||||
private Long areaId;
|
||||
|
||||
/**
|
||||
* 匹配的 Beacon MAC 地址
|
||||
*/
|
||||
private String beaconMac;
|
||||
|
||||
/**
|
||||
* 进入时的 RSSI 值
|
||||
*/
|
||||
private Integer enterRssi;
|
||||
|
||||
/**
|
||||
* 事件时间(ISO 格式)
|
||||
*/
|
||||
private String eventTime;
|
||||
|
||||
/**
|
||||
* 租户ID
|
||||
*/
|
||||
private Long tenantId;
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package com.viewsh.module.ops.environment.integration.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 轨迹离开区域事件 DTO
|
||||
* <p>
|
||||
* 由 IoT 模块发布,Ops 模块消费
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class TrajectoryLeaveEventDTO {
|
||||
|
||||
/**
|
||||
* 事件ID(UUID,用于幂等性控制)
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* 设备ID(工牌)
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 设备备注名称
|
||||
*/
|
||||
private String nickname;
|
||||
|
||||
/**
|
||||
* 区域ID
|
||||
*/
|
||||
private Long areaId;
|
||||
|
||||
/**
|
||||
* 匹配的 Beacon MAC 地址
|
||||
*/
|
||||
private String beaconMac;
|
||||
|
||||
/**
|
||||
* 离开原因
|
||||
* <p>
|
||||
* SIGNAL_LOSS / AREA_SWITCH / DEVICE_OFFLINE
|
||||
*/
|
||||
private String leaveReason;
|
||||
|
||||
/**
|
||||
* 进入时间戳(毫秒),用于匹配轨迹记录
|
||||
*/
|
||||
private Long enterTimestamp;
|
||||
|
||||
/**
|
||||
* 事件时间(ISO 格式)
|
||||
*/
|
||||
private String eventTime;
|
||||
|
||||
/**
|
||||
* 租户ID
|
||||
*/
|
||||
private Long tenantId;
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package com.viewsh.module.ops.environment.service.trajectory;
|
||||
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.module.ops.service.trajectory.dto.TrajectoryPageReqDTO;
|
||||
import com.viewsh.module.ops.service.trajectory.dto.TrajectoryRespDTO;
|
||||
import com.viewsh.module.ops.service.trajectory.dto.TrajectorySummaryDTO;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 设备轨迹服务
|
||||
* <p>
|
||||
* 负责轨迹记录的创建和更新
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
public interface DeviceTrajectoryService {
|
||||
|
||||
/**
|
||||
* 记录设备进入区域
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param deviceName 设备名称
|
||||
* @param nickname 设备备注名称
|
||||
* @param areaId 区域ID
|
||||
* @param beaconMac Beacon MAC
|
||||
* @param enterRssi 进入时 RSSI
|
||||
* @param enterTime 进入时间
|
||||
*/
|
||||
void recordEnter(Long deviceId, String deviceName, String nickname, Long areaId,
|
||||
String beaconMac, Integer enterRssi, LocalDateTime enterTime);
|
||||
|
||||
/**
|
||||
* 记录设备离开区域
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param areaId 区域ID
|
||||
* @param leaveReason 离开原因
|
||||
* @param enterTimestamp 进入时间戳(毫秒),用于匹配记录
|
||||
* @param leaveTime 离开时间
|
||||
*/
|
||||
void recordLeave(Long deviceId, Long areaId, String leaveReason,
|
||||
Long enterTimestamp, LocalDateTime leaveTime);
|
||||
|
||||
/**
|
||||
* 分页查询轨迹记录
|
||||
*/
|
||||
PageResult<TrajectoryRespDTO> getTrajectoryPage(TrajectoryPageReqDTO req);
|
||||
|
||||
/**
|
||||
* 查询某设备某天的轨迹时间线
|
||||
*/
|
||||
List<TrajectoryRespDTO> getTimeline(Long deviceId, LocalDate date);
|
||||
|
||||
/**
|
||||
* 查询轨迹统计摘要
|
||||
*
|
||||
* @param deviceId 设备ID(必填)
|
||||
* @param date 日期(必填)
|
||||
*/
|
||||
TrajectorySummaryDTO getSummary(Long deviceId, LocalDate date);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,284 @@
|
||||
package com.viewsh.module.ops.environment.service.trajectory;
|
||||
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.framework.common.util.object.BeanUtils;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
|
||||
import com.viewsh.module.ops.dal.dataobject.trajectory.OpsDeviceTrajectoryDO;
|
||||
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
|
||||
import com.viewsh.module.ops.dal.mysql.trajectory.OpsDeviceTrajectoryMapper;
|
||||
import com.viewsh.module.ops.service.trajectory.dto.TrajectoryPageReqDTO;
|
||||
import com.viewsh.module.ops.service.trajectory.dto.TrajectoryRespDTO;
|
||||
import com.viewsh.module.ops.service.trajectory.dto.TrajectorySummaryDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 设备轨迹服务实现
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class DeviceTrajectoryServiceImpl implements DeviceTrajectoryService {
|
||||
|
||||
@Resource
|
||||
private OpsDeviceTrajectoryMapper trajectoryMapper;
|
||||
|
||||
@Resource
|
||||
private OpsBusAreaMapper areaMapper;
|
||||
|
||||
// ==================== 写入方法 ====================
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void recordEnter(Long deviceId, String deviceName, String nickname, Long areaId,
|
||||
String beaconMac, Integer enterRssi, LocalDateTime enterTime) {
|
||||
// 使用 SELECT ... FOR UPDATE 防止并发创建重复记录
|
||||
OpsDeviceTrajectoryDO openRecord = trajectoryMapper.selectOpenRecordForUpdate(deviceId, areaId);
|
||||
if (openRecord != null) {
|
||||
log.warn("[Trajectory] 设备已有未关闭的轨迹记录,跳过创建:deviceId={}, areaId={}, existingId={}",
|
||||
deviceId, areaId, openRecord.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
OpsDeviceTrajectoryDO record = OpsDeviceTrajectoryDO.builder()
|
||||
.deviceId(deviceId)
|
||||
.deviceName(deviceName)
|
||||
.nickname(nickname)
|
||||
.areaId(areaId)
|
||||
.beaconMac(beaconMac)
|
||||
.enterRssi(enterRssi)
|
||||
.enterTime(enterTime)
|
||||
.build();
|
||||
|
||||
// 填充区域名称冗余字段
|
||||
fillAreaName(record);
|
||||
|
||||
trajectoryMapper.insert(record);
|
||||
log.info("[Trajectory] 创建轨迹记录:id={}, deviceId={}, areaId={}, enterTime={}",
|
||||
record.getId(), deviceId, areaId, enterTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void recordLeave(Long deviceId, Long areaId, String leaveReason,
|
||||
Long enterTimestamp, LocalDateTime leaveTime) {
|
||||
// 优先使用 enterTimestamp 精确匹配,避免关闭错误的记录
|
||||
OpsDeviceTrajectoryDO record = null;
|
||||
if (enterTimestamp != null) {
|
||||
LocalDateTime enterTime = LocalDateTime.ofInstant(
|
||||
Instant.ofEpochMilli(enterTimestamp), ZoneId.systemDefault());
|
||||
record = trajectoryMapper.selectOpenRecordByEnterTimeForUpdate(deviceId, areaId, enterTime);
|
||||
}
|
||||
// 降级:按 deviceId + areaId 查询最近的未关闭记录
|
||||
if (record == null) {
|
||||
record = trajectoryMapper.selectOpenRecordForUpdate(deviceId, areaId);
|
||||
}
|
||||
if (record == null) {
|
||||
log.warn("[Trajectory] 未找到匹配的轨迹记录,跳过更新:deviceId={}, areaId={}, enterTimestamp={}",
|
||||
deviceId, areaId, enterTimestamp);
|
||||
return;
|
||||
}
|
||||
|
||||
int durationSeconds = 0;
|
||||
if (record.getEnterTime() != null && leaveTime != null) {
|
||||
durationSeconds = (int) Duration.between(record.getEnterTime(), leaveTime).getSeconds();
|
||||
if (durationSeconds < 0) {
|
||||
durationSeconds = 0;
|
||||
}
|
||||
}
|
||||
|
||||
record.setLeaveTime(leaveTime);
|
||||
record.setDurationSeconds(durationSeconds);
|
||||
record.setLeaveReason(leaveReason);
|
||||
|
||||
trajectoryMapper.updateById(record);
|
||||
log.info("[Trajectory] 更新轨迹记录(离开):id={}, deviceId={}, areaId={}, duration={}s, reason={}",
|
||||
record.getId(), deviceId, areaId, durationSeconds, leaveReason);
|
||||
}
|
||||
|
||||
// ==================== 查询方法 ====================
|
||||
|
||||
@Override
|
||||
public PageResult<TrajectoryRespDTO> getTrajectoryPage(TrajectoryPageReqDTO req) {
|
||||
PageResult<OpsDeviceTrajectoryDO> pageResult = trajectoryMapper.selectPage(req);
|
||||
PageResult<TrajectoryRespDTO> result = BeanUtils.toBean(pageResult, TrajectoryRespDTO.class);
|
||||
enrichWithAreaInfo(result.getList());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TrajectoryRespDTO> getTimeline(Long deviceId, LocalDate date) {
|
||||
List<OpsDeviceTrajectoryDO> list = trajectoryMapper.selectTimeline(deviceId, date);
|
||||
List<TrajectoryRespDTO> result = BeanUtils.toBean(list, TrajectoryRespDTO.class);
|
||||
enrichWithAreaInfo(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TrajectorySummaryDTO getSummary(Long deviceId, LocalDate date) {
|
||||
List<OpsDeviceTrajectoryDO> list = trajectoryMapper.selectTimeline(deviceId, date);
|
||||
if (list.isEmpty()) {
|
||||
return TrajectorySummaryDTO.builder()
|
||||
.totalRecords(0L)
|
||||
.completedRecords(0L)
|
||||
.coveredAreaCount(0L)
|
||||
.totalDurationSeconds(0L)
|
||||
.avgDurationSeconds(0L)
|
||||
.maxDurationSeconds(0L)
|
||||
.build();
|
||||
}
|
||||
|
||||
long totalRecords = list.size();
|
||||
long coveredAreaCount = list.stream()
|
||||
.map(OpsDeviceTrajectoryDO::getAreaId)
|
||||
.distinct()
|
||||
.count();
|
||||
|
||||
// 只统计已关闭(有 durationSeconds)的记录
|
||||
List<Integer> durations = list.stream()
|
||||
.map(OpsDeviceTrajectoryDO::getDurationSeconds)
|
||||
.filter(Objects::nonNull)
|
||||
.toList();
|
||||
|
||||
long completedRecords = durations.size();
|
||||
long totalDuration = durations.stream().mapToLong(Integer::longValue).sum();
|
||||
long avgDuration = durations.isEmpty() ? 0 : totalDuration / durations.size();
|
||||
long maxDuration = durations.stream().mapToInt(Integer::intValue).max().orElse(0);
|
||||
|
||||
return TrajectorySummaryDTO.builder()
|
||||
.totalRecords(totalRecords)
|
||||
.completedRecords(completedRecords)
|
||||
.coveredAreaCount(coveredAreaCount)
|
||||
.totalDurationSeconds(totalDuration)
|
||||
.avgDurationSeconds(avgDuration)
|
||||
.maxDurationSeconds(maxDuration)
|
||||
.build();
|
||||
}
|
||||
|
||||
// ==================== 内部方法 ====================
|
||||
|
||||
/**
|
||||
* 填充单条记录的区域名称
|
||||
*/
|
||||
private void fillAreaName(OpsDeviceTrajectoryDO record) {
|
||||
if (record.getAreaId() == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
OpsBusAreaDO area = areaMapper.selectById(record.getAreaId());
|
||||
if (area != null) {
|
||||
record.setAreaName(area.getAreaName());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[Trajectory] 查询区域名称失败:areaId={}", record.getAreaId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量填充轨迹记录的区域信息(areaName、buildingName、floorNo)
|
||||
*/
|
||||
private void enrichWithAreaInfo(List<TrajectoryRespDTO> list) {
|
||||
if (list == null || list.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 收集所有 areaId
|
||||
Set<Long> areaIds = list.stream()
|
||||
.map(TrajectoryRespDTO::getAreaId)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (areaIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 批量查询区域
|
||||
List<OpsBusAreaDO> areas = areaMapper.selectBatchIds(areaIds);
|
||||
Map<Long, OpsBusAreaDO> areaMap = areas.stream()
|
||||
.collect(Collectors.toMap(OpsBusAreaDO::getId, a -> a, (a, b) -> a));
|
||||
|
||||
// 收集需要查询的父级 ID(楼栋信息)
|
||||
Set<Long> parentIds = new HashSet<>();
|
||||
for (OpsBusAreaDO area : areas) {
|
||||
if (area.getParentPath() != null && !area.getParentPath().isEmpty()) {
|
||||
// parentPath 格式: "/1/2/3",提取所有祖先 ID
|
||||
for (String idStr : area.getParentPath().split("/")) {
|
||||
if (!idStr.isEmpty()) {
|
||||
try {
|
||||
parentIds.add(Long.parseLong(idStr));
|
||||
} catch (NumberFormatException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (area.getParentId() != null) {
|
||||
parentIds.add(area.getParentId());
|
||||
}
|
||||
}
|
||||
parentIds.removeAll(areaIds); // 排除已查过的
|
||||
|
||||
// 批量查询父级区域
|
||||
Map<Long, OpsBusAreaDO> allAreaMap = new HashMap<>(areaMap);
|
||||
if (!parentIds.isEmpty()) {
|
||||
List<OpsBusAreaDO> parentAreas = areaMapper.selectBatchIds(parentIds);
|
||||
for (OpsBusAreaDO pa : parentAreas) {
|
||||
allAreaMap.put(pa.getId(), pa);
|
||||
}
|
||||
}
|
||||
|
||||
// 填充每条记录
|
||||
for (TrajectoryRespDTO dto : list) {
|
||||
if (dto.getAreaId() == null) {
|
||||
continue;
|
||||
}
|
||||
OpsBusAreaDO area = areaMap.get(dto.getAreaId());
|
||||
if (area == null) {
|
||||
continue;
|
||||
}
|
||||
dto.setAreaName(area.getAreaName());
|
||||
dto.setFloorNo(area.getFloorNo());
|
||||
dto.setBuildingName(findBuildingName(area, allAreaMap));
|
||||
|
||||
// 如果 floorNo 为空,尝试从 FLOOR 类型的父级获取
|
||||
if (dto.getFloorNo() == null && area.getParentId() != null) {
|
||||
OpsBusAreaDO parent = allAreaMap.get(area.getParentId());
|
||||
if (parent != null && "FLOOR".equals(parent.getAreaType())) {
|
||||
dto.setFloorNo(parent.getFloorNo());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 沿 parentPath 向上查找 BUILDING 类型的祖先区域名称
|
||||
*/
|
||||
private String findBuildingName(OpsBusAreaDO area, Map<Long, OpsBusAreaDO> allAreaMap) {
|
||||
if (area.getParentPath() == null) {
|
||||
return null;
|
||||
}
|
||||
for (String idStr : area.getParentPath().split("/")) {
|
||||
if (idStr.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
OpsBusAreaDO ancestor = allAreaMap.get(Long.parseLong(idStr));
|
||||
if (ancestor != null && "BUILDING".equals(ancestor.getAreaType())) {
|
||||
return ancestor.getAreaName();
|
||||
}
|
||||
} catch (NumberFormatException ignored) {
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.viewsh.module.ops.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 轨迹离开原因枚举
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum LeaveReasonEnum {
|
||||
|
||||
SIGNAL_LOSS("SIGNAL_LOSS", "信号丢失"),
|
||||
AREA_SWITCH("AREA_SWITCH", "切换到其他区域"),
|
||||
DEVICE_OFFLINE("DEVICE_OFFLINE", "设备离线");
|
||||
|
||||
private final String reason;
|
||||
private final String description;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
package com.viewsh.module.ops.dal.dataobject.trajectory;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.KeySequence;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.viewsh.framework.tenant.core.db.TenantBaseDO;
|
||||
import lombok.*;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 设备轨迹记录 DO
|
||||
* <p>
|
||||
* 记录工牌设备进出各区域的轨迹
|
||||
* 一条记录表示一次"进入-离开"周期
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@TableName("ops_device_trajectory")
|
||||
@KeySequence("ops_device_trajectory_seq")
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@ToString(callSuper = true)
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class OpsDeviceTrajectoryDO extends TenantBaseDO {
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@TableId
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 工牌设备ID
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 设备名称(冗余)
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 设备备注名称(冗余)
|
||||
*/
|
||||
private String nickname;
|
||||
|
||||
/**
|
||||
* 人员ID(预留)
|
||||
*/
|
||||
private Long personId;
|
||||
|
||||
/**
|
||||
* 人员名称(预留)
|
||||
*/
|
||||
private String personName;
|
||||
|
||||
/**
|
||||
* 区域ID
|
||||
*/
|
||||
private Long areaId;
|
||||
|
||||
/**
|
||||
* 区域名称(冗余)
|
||||
*/
|
||||
private String areaName;
|
||||
|
||||
/**
|
||||
* 匹配的 Beacon MAC
|
||||
*/
|
||||
private String beaconMac;
|
||||
|
||||
/**
|
||||
* 进入时间
|
||||
*/
|
||||
private LocalDateTime enterTime;
|
||||
|
||||
/**
|
||||
* 离开时间
|
||||
*/
|
||||
private LocalDateTime leaveTime;
|
||||
|
||||
/**
|
||||
* 停留时长(秒)
|
||||
*/
|
||||
private Integer durationSeconds;
|
||||
|
||||
/**
|
||||
* 离开原因
|
||||
* <p>
|
||||
* SIGNAL_LOSS - 信号丢失
|
||||
* AREA_SWITCH - 切换到其他区域
|
||||
* DEVICE_OFFLINE - 设备离线
|
||||
*/
|
||||
private String leaveReason;
|
||||
|
||||
/**
|
||||
* 进入时 RSSI
|
||||
*/
|
||||
private Integer enterRssi;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package com.viewsh.module.ops.dal.mysql.trajectory;
|
||||
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.framework.mybatis.core.mapper.BaseMapperX;
|
||||
import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX;
|
||||
import com.viewsh.module.ops.dal.dataobject.trajectory.OpsDeviceTrajectoryDO;
|
||||
import com.viewsh.module.ops.service.trajectory.dto.TrajectoryPageReqDTO;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 设备轨迹记录 Mapper
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Mapper
|
||||
public interface OpsDeviceTrajectoryMapper extends BaseMapperX<OpsDeviceTrajectoryDO> {
|
||||
|
||||
/**
|
||||
* 查询设备在某区域最近一条未关闭的轨迹记录
|
||||
*/
|
||||
default OpsDeviceTrajectoryDO selectOpenRecord(Long deviceId, Long areaId) {
|
||||
return selectOne(new LambdaQueryWrapperX<OpsDeviceTrajectoryDO>()
|
||||
.eq(OpsDeviceTrajectoryDO::getDeviceId, deviceId)
|
||||
.eq(OpsDeviceTrajectoryDO::getAreaId, areaId)
|
||||
.isNull(OpsDeviceTrajectoryDO::getLeaveTime)
|
||||
.orderByDesc(OpsDeviceTrajectoryDO::getEnterTime)
|
||||
.last("LIMIT 1"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询设备在某区域未关闭的轨迹记录(加锁,防止并发竞态)
|
||||
*/
|
||||
default OpsDeviceTrajectoryDO selectOpenRecordForUpdate(Long deviceId, Long areaId) {
|
||||
return selectOne(new LambdaQueryWrapperX<OpsDeviceTrajectoryDO>()
|
||||
.eq(OpsDeviceTrajectoryDO::getDeviceId, deviceId)
|
||||
.eq(OpsDeviceTrajectoryDO::getAreaId, areaId)
|
||||
.isNull(OpsDeviceTrajectoryDO::getLeaveTime)
|
||||
.orderByDesc(OpsDeviceTrajectoryDO::getEnterTime)
|
||||
.last("LIMIT 1 FOR UPDATE"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询设备在某区域、按进入时间精确匹配的未关闭轨迹记录(加锁)
|
||||
*/
|
||||
default OpsDeviceTrajectoryDO selectOpenRecordByEnterTimeForUpdate(Long deviceId, Long areaId,
|
||||
LocalDateTime enterTime) {
|
||||
return selectOne(new LambdaQueryWrapperX<OpsDeviceTrajectoryDO>()
|
||||
.eq(OpsDeviceTrajectoryDO::getDeviceId, deviceId)
|
||||
.eq(OpsDeviceTrajectoryDO::getAreaId, areaId)
|
||||
.eq(OpsDeviceTrajectoryDO::getEnterTime, enterTime)
|
||||
.isNull(OpsDeviceTrajectoryDO::getLeaveTime)
|
||||
.last("LIMIT 1 FOR UPDATE"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 分页查询轨迹记录
|
||||
*/
|
||||
default PageResult<OpsDeviceTrajectoryDO> selectPage(TrajectoryPageReqDTO req) {
|
||||
return selectPage(req, new LambdaQueryWrapperX<OpsDeviceTrajectoryDO>()
|
||||
.eqIfPresent(OpsDeviceTrajectoryDO::getDeviceId, req.getDeviceId())
|
||||
.eqIfPresent(OpsDeviceTrajectoryDO::getAreaId, req.getAreaId())
|
||||
.betweenIfPresent(OpsDeviceTrajectoryDO::getEnterTime, req.getEnterTime())
|
||||
.orderByDesc(OpsDeviceTrajectoryDO::getEnterTime));
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询某设备某天的轨迹时间线(不分页,按进入时间升序)
|
||||
*/
|
||||
default List<OpsDeviceTrajectoryDO> selectTimeline(Long deviceId, LocalDate date) {
|
||||
LocalDateTime start = date.atStartOfDay();
|
||||
LocalDateTime end = date.plusDays(1).atStartOfDay();
|
||||
return selectList(new LambdaQueryWrapperX<OpsDeviceTrajectoryDO>()
|
||||
.eq(OpsDeviceTrajectoryDO::getDeviceId, deviceId)
|
||||
.ge(OpsDeviceTrajectoryDO::getEnterTime, start)
|
||||
.lt(OpsDeviceTrajectoryDO::getEnterTime, end)
|
||||
.orderByAsc(OpsDeviceTrajectoryDO::getEnterTime));
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user