feat(trajectory): 新增轨迹检测与 Beacon 注册表

This commit is contained in:
lzh
2026-03-31 22:53:06 +08:00
parent a9941a29a9
commit 11dcb57ff3
17 changed files with 1216 additions and 25 deletions

View File

@@ -0,0 +1,36 @@
package com.viewsh.module.iot.api.trajectory;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 设备实时位置 DTO
*
* @author lzh
*/
@Schema(description = "RPC - 设备实时位置")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceLocationDTO {
@Schema(description = "设备ID", example = "31")
private Long deviceId;
@Schema(description = "当前所在区域ID", example = "1301")
private Long areaId;
@Schema(description = "进入时间(毫秒时间戳)", example = "1711872600000")
private Long enterTime;
@Schema(description = "匹配的Beacon MAC", example = "F0:C8:60:1D:10:BB")
private String beaconMac;
@Schema(description = "是否在某区域内")
private Boolean inArea;
}

View File

@@ -0,0 +1,31 @@
package com.viewsh.module.iot.api.trajectory;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.module.iot.enums.ApiConstants;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
* 轨迹实时状态 API
* <p>
* 提供 RPC 接口供 Ops 模块查询设备当前位置
*
* @author lzh
*/
@FeignClient(name = ApiConstants.NAME)
@Tag(name = "RPC 服务 - 轨迹实时状态")
public interface TrajectoryStateApi {
String PREFIX = ApiConstants.PREFIX + "/trajectory";
@GetMapping(PREFIX + "/current-location")
@Operation(summary = "查询设备当前位置")
CommonResult<DeviceLocationDTO> getCurrentLocation(
@Parameter(description = "设备ID", required = true, example = "31")
@RequestParam("deviceId") Long deviceId);
}

View File

@@ -0,0 +1,22 @@
package com.viewsh.module.iot.core.integration.constants;
/**
* 轨迹事件 Topic 常量
* <p>
* 定义 IoT → Ops 的轨迹事件 Topic
*
* @author lzh
*/
public interface TrajectoryTopics {
/**
* 进入区域事件
*/
String TRAJECTORY_ENTER = "trajectory-enter";
/**
* 离开区域事件
*/
String TRAJECTORY_LEAVE = "trajectory-leave";
}

View File

@@ -0,0 +1,72 @@
package com.viewsh.module.iot.core.integration.event.trajectory;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 轨迹进入区域事件
* <p>
* 当工牌蓝牙信号满足强进条件确认进入某区域时发布
* Topic: trajectory-enter
*
* @author lzh
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TrajectoryEnterEvent {
/**
* 事件ID唯一标识用于幂等性处理
*/
@Builder.Default
private String eventId = UUID.randomUUID().toString();
/**
* 设备ID工牌
*/
private Long deviceId;
/**
* 设备名称
*/
private String deviceName;
/**
* 设备备注名称
*/
private String nickname;
/**
* 区域ID
*/
private Long areaId;
/**
* 匹配的 Beacon MAC 地址
*/
private String beaconMac;
/**
* 进入时的 RSSI 值
*/
private Integer enterRssi;
/**
* 事件时间ISO-8601 字符串,跨模块序列化安全)
*/
@Builder.Default
private String eventTime = LocalDateTime.now().toString();
/**
* 租户ID
*/
private Long tenantId;
}

View File

@@ -0,0 +1,81 @@
package com.viewsh.module.iot.core.integration.event.trajectory;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 轨迹离开区域事件
* <p>
* 当工牌蓝牙信号满足弱出条件确认离开某区域时发布
* Topic: trajectory-leave
*
* @author lzh
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TrajectoryLeaveEvent {
/**
* 事件ID唯一标识用于幂等性处理
*/
@Builder.Default
private String eventId = UUID.randomUUID().toString();
/**
* 设备ID工牌
*/
private Long deviceId;
/**
* 设备名称
*/
private String deviceName;
/**
* 设备备注名称
*/
private String nickname;
/**
* 区域ID
*/
private Long areaId;
/**
* 匹配的 Beacon MAC 地址
*/
private String beaconMac;
/**
* 离开原因
* <p>
* SIGNAL_LOSS - 信号丢失(弱出条件满足)
* AREA_SWITCH - 切换到其他区域
* DEVICE_OFFLINE - 设备离线
*/
private String leaveReason;
/**
* 进入时间戳(毫秒),用于 Ops 端匹配记录
*/
private Long enterTimestamp;
/**
* 事件时间ISO-8601 字符串,跨模块序列化安全)
*/
@Builder.Default
private String eventTime = LocalDateTime.now().toString();
/**
* 租户ID
*/
private Long tenantId;
}

View File

@@ -0,0 +1,46 @@
package com.viewsh.module.iot.controller.rpc;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.module.iot.api.trajectory.DeviceLocationDTO;
import com.viewsh.module.iot.api.trajectory.TrajectoryStateApi;
import com.viewsh.module.iot.dal.redis.clean.TrajectoryStateRedisDAO;
import jakarta.annotation.Resource;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import static com.viewsh.framework.common.pojo.CommonResult.success;
/**
* 轨迹实时状态 RPC Controller
* <p>
* 实现 {@link TrajectoryStateApi},从 Redis 读取设备当前位置
*
* @author lzh
*/
@RestController
@Validated
public class TrajectoryStateController implements TrajectoryStateApi {
@Resource
private TrajectoryStateRedisDAO trajectoryStateRedisDAO;
@Override
public CommonResult<DeviceLocationDTO> getCurrentLocation(Long deviceId) {
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea = trajectoryStateRedisDAO.getCurrentArea(deviceId);
if (currentArea == null) {
return success(DeviceLocationDTO.builder()
.deviceId(deviceId)
.inArea(false)
.build());
}
return success(DeviceLocationDTO.builder()
.deviceId(deviceId)
.areaId(currentArea.getAreaId())
.enterTime(currentArea.getEnterTime())
.beaconMac(currentArea.getBeaconMac())
.inArea(true)
.build());
}
}

View File

@@ -0,0 +1,26 @@
package com.viewsh.module.iot.dal.dataobject.integration.clean;
import lombok.Data;
/**
* 轨迹记录功能配置
* <p>
* 存储在 iot_device.config JSON 字段中,与 ButtonEventConfig 同级
* <pre>
* {
* "buttonEvent": { ... },
* "trajectoryTracking": { "enabled": true }
* }
* </pre>
*
* @author lzh
*/
@Data
public class TrajectoryTrackingConfig {
/**
* 是否启用轨迹记录
*/
private Boolean enabled;
}

View File

@@ -0,0 +1,71 @@
package com.viewsh.module.iot.dal.redis.clean;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 轨迹检测 RSSI 滑动窗口 Redis DAO
* <p>
* 与 BeaconRssiWindowRedisDAO 功能相同,使用独立的 Key 前缀,
* 避免与保洁到岗检测的窗口数据互相干扰
*
* @author lzh
*/
@Repository
public class TrajectoryRssiWindowRedisDAO {
private static final String WINDOW_KEY_PATTERN = "iot:trajectory:rssi:window:%s:%s";
private static final int WINDOW_TTL_SECONDS = 3600;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 更新滑动窗口(保留最近 N 个样本)
*/
public void updateWindow(Long deviceId, Long areaId, Integer rssi, Integer maxSize) {
String key = formatKey(deviceId, areaId);
List<Integer> window = getWindow(deviceId, areaId);
window.add(rssi);
if (window.size() > maxSize) {
window = window.subList(window.size() - maxSize, window.size());
}
String windowStr = window.stream()
.map(String::valueOf)
.collect(Collectors.joining(","));
stringRedisTemplate.opsForValue().set(key, windowStr, WINDOW_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 获取窗口样本
*/
public List<Integer> getWindow(Long deviceId, Long areaId) {
String key = formatKey(deviceId, areaId);
String windowStr = stringRedisTemplate.opsForValue().get(key);
if (windowStr == null || windowStr.isEmpty()) {
return new ArrayList<>();
}
return List.of(windowStr.split(","))
.stream()
.map(Integer::parseInt)
.collect(Collectors.toList());
}
/**
* 清除窗口
*/
public void clearWindow(Long deviceId, Long areaId) {
String key = formatKey(deviceId, areaId);
stringRedisTemplate.delete(key);
}
private static String formatKey(Long deviceId, Long areaId) {
return String.format(WINDOW_KEY_PATTERN, deviceId, areaId);
}
}

View File

@@ -0,0 +1,109 @@
package com.viewsh.module.iot.dal.redis.clean;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 轨迹实时状态 Redis DAO
* <p>
* 维护每个设备当前所在区域的实时状态
* <p>
* Key: iot:trajectory:current:{deviceId}
* Type: Hash
* Fields: areaId, enterTime, beaconMac
* TTL: 24小时
*
* @author lzh
*/
@Slf4j
@Repository
public class TrajectoryStateRedisDAO {
private static final String CURRENT_KEY_PATTERN = "iot:trajectory:current:%s";
private static final int CURRENT_TTL_SECONDS = 86400; // 24小时
private static final String FIELD_AREA_ID = "areaId";
private static final String FIELD_ENTER_TIME = "enterTime";
private static final String FIELD_BEACON_MAC = "beaconMac";
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 设置当前所在区域
*/
public void setCurrentArea(Long deviceId, Long areaId, long enterTime, String beaconMac) {
String key = formatKey(deviceId);
// 先删后写,清除旧字段(如上次的 beaconMac保证一致性
stringRedisTemplate.delete(key);
Map<String, String> fields = new HashMap<>();
fields.put(FIELD_AREA_ID, String.valueOf(areaId));
fields.put(FIELD_ENTER_TIME, String.valueOf(enterTime));
if (beaconMac != null) {
fields.put(FIELD_BEACON_MAC, beaconMac);
}
stringRedisTemplate.opsForHash().putAll(key, fields);
stringRedisTemplate.expire(key, CURRENT_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 获取当前所在区域信息
*
* @return 当前区域信息,如果不在任何区域则返回 null
*/
public CurrentAreaInfo getCurrentArea(Long deviceId) {
String key = formatKey(deviceId);
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(key);
if (entries == null || entries.isEmpty()) {
return null;
}
Object areaIdObj = entries.get(FIELD_AREA_ID);
Object enterTimeObj = entries.get(FIELD_ENTER_TIME);
if (areaIdObj == null || enterTimeObj == null) {
return null;
}
CurrentAreaInfo info = new CurrentAreaInfo();
info.setAreaId(Long.parseLong(areaIdObj.toString()));
info.setEnterTime(Long.parseLong(enterTimeObj.toString()));
Object beaconMacObj = entries.get(FIELD_BEACON_MAC);
if (beaconMacObj != null) {
info.setBeaconMac(beaconMacObj.toString());
}
return info;
}
/**
* 清除当前区域状态
*/
public void clearCurrentArea(Long deviceId) {
String key = formatKey(deviceId);
stringRedisTemplate.delete(key);
}
private static String formatKey(Long deviceId) {
return String.format(CURRENT_KEY_PATTERN, deviceId);
}
/**
* 当前区域信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class CurrentAreaInfo {
private Long areaId;
private Long enterTime;
private String beaconMac;
}
}

View File

@@ -0,0 +1,51 @@
package com.viewsh.module.iot.service.integration.clean;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* Beacon 全量注册表服务
* <p>
* 为轨迹检测功能提供 beaconMac → {areaId, config} 的全量映射
* <p>
* 数据来源Ops 模块的 ops_area_device_relation 表relationType=BEACON
* 缓存Redis Hash1小时 TTL启动预热 + 定时刷新
*
* @author lzh
*/
public interface BeaconRegistryService {
/**
* 获取所有已注册的 Beacon 配置
*
* @return beaconMac(大写) → BeaconAreaInfo 的映射
*/
Map<String, BeaconAreaInfo> getAllBeaconConfigs();
/**
* 刷新 Beacon 注册表缓存
*/
void refreshRegistry();
/**
* Beacon 与区域的关联信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
class BeaconAreaInfo {
/**
* 区域ID
*/
private Long areaId;
/**
* Beacon 检测配置
*/
private BeaconPresenceConfig config;
}
}

View File

@@ -0,0 +1,165 @@
package com.viewsh.module.iot.service.integration.clean;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig;
import com.viewsh.module.ops.api.area.AreaDeviceApi;
import com.viewsh.module.ops.api.area.AreaDeviceDTO;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Beacon 全量注册表服务实现
* <p>
* 缓存策略:
* - Redis Hash: iot:trajectory:beacon:registryfield=beaconMacvalue=JSON{areaId,config}
* - TTL: 1小时
* - 启动时预热每30分钟定时刷新
* - 如果 Redis 缓存失效,降级为实时调用 Feign API
*
* @author lzh
*/
@Slf4j
@Service
public class BeaconRegistryServiceImpl implements BeaconRegistryService {
private static final String REGISTRY_KEY = "iot:trajectory:beacon:registry";
private static final int REGISTRY_TTL_SECONDS = 3600; // 1小时
@Resource
private AreaDeviceApi areaDeviceApi;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private CleanOrderIntegrationConfigService configService;
/**
* 启动时预热缓存
*/
@PostConstruct
public void init() {
try {
refreshRegistry();
} catch (Exception e) {
log.warn("[BeaconRegistry] 启动预热失败,将在下次查询时加载", e);
}
}
/**
* 每30分钟刷新一次注册表
*/
@Scheduled(fixedDelay = 30 * 60 * 1000, initialDelay = 30 * 60 * 1000)
public void scheduledRefresh() {
try {
refreshRegistry();
} catch (Exception e) {
log.error("[BeaconRegistry] 定时刷新失败", e);
}
}
@Override
public Map<String, BeaconAreaInfo> getAllBeaconConfigs() {
// 1. 尝试从 Redis 读取
try {
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(REGISTRY_KEY);
if (entries != null && !entries.isEmpty()) {
Map<String, BeaconAreaInfo> result = new HashMap<>(entries.size());
for (Map.Entry<Object, Object> entry : entries.entrySet()) {
String mac = entry.getKey().toString();
BeaconAreaInfo info = JsonUtils.parseObject(entry.getValue().toString(), BeaconAreaInfo.class);
if (info != null) {
result.put(mac, info);
}
}
log.debug("[BeaconRegistry] 从缓存加载 {} 个 Beacon 配置", result.size());
return result;
}
} catch (Exception e) {
log.warn("[BeaconRegistry] 读取缓存失败,降级为实时查询", e);
}
// 2. 缓存未命中,实时查询并写入缓存
return loadAndCacheRegistry();
}
@Override
public void refreshRegistry() {
loadAndCacheRegistry();
}
/**
* 从 Ops 加载全量 Beacon 注册表并写入 Redis 缓存
*/
private Map<String, BeaconAreaInfo> loadAndCacheRegistry() {
Map<String, BeaconAreaInfo> registry = new HashMap<>();
try {
CommonResult<List<AreaDeviceDTO>> result = areaDeviceApi.getAllEnabledBeacons();
if (result == null || !result.isSuccess() || result.getData() == null) {
log.warn("[BeaconRegistry] 调用 Ops 获取 Beacon 列表失败");
return registry;
}
Map<String, String> redisEntries = new HashMap<>();
for (AreaDeviceDTO dto : result.getData()) {
if (dto.getConfigData() == null || dto.getAreaId() == null) {
continue;
}
// 反序列化配置
CleanOrderIntegrationConfig integrationConfig = configService.convertConfig(dto.getConfigData());
if (integrationConfig == null || integrationConfig.getBeaconPresence() == null) {
continue;
}
BeaconPresenceConfig beaconConfig = integrationConfig.getBeaconPresence();
if (beaconConfig.getEnabled() == null || !beaconConfig.getEnabled()) {
continue;
}
String mac = beaconConfig.getBeaconMac();
if (mac == null || mac.isEmpty()) {
continue;
}
// 统一转大写
mac = mac.toUpperCase();
BeaconAreaInfo info = new BeaconAreaInfo(dto.getAreaId(), beaconConfig);
registry.put(mac, info);
redisEntries.put(mac, JsonUtils.toJsonString(info));
}
// 写入 Redis使用临时 Key + rename 保证原子性,避免竞态空窗期)
if (!redisEntries.isEmpty()) {
String tempKey = REGISTRY_KEY + ":tmp:" + System.currentTimeMillis();
stringRedisTemplate.opsForHash().putAll(tempKey, redisEntries);
stringRedisTemplate.expire(tempKey, REGISTRY_TTL_SECONDS, TimeUnit.SECONDS);
stringRedisTemplate.rename(tempKey, REGISTRY_KEY);
} else {
// 所有 Beacon 都被禁用时,清除旧缓存
stringRedisTemplate.delete(REGISTRY_KEY);
}
log.info("[BeaconRegistry] 加载并缓存 {} 个 Beacon 配置", registry.size());
} catch (Exception e) {
log.error("[BeaconRegistry] 加载 Beacon 注册表失败", e);
}
return registry;
}
}

View File

@@ -5,6 +5,7 @@ import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.ButtonEventRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.TrafficThresholdRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.TrajectoryDetectionProcessor;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -33,6 +34,9 @@ public class CleanRuleProcessorManager {
@Resource
private ButtonEventRuleProcessor buttonEventRuleProcessor;
@Resource
private TrajectoryDetectionProcessor trajectoryDetectionProcessor;
/**
* 处理设备消息
* <p>
@@ -76,6 +80,8 @@ public class CleanRuleProcessorManager {
// 避免退出检测窗口因无数据而停滞
if (!data.containsKey("bluetoothDevices")) {
beaconDetectionRuleProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
// 轨迹检测同样需要信号丢失补偿,注入 null 使窗口写入 -999
trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
}
}
}
@@ -126,8 +132,11 @@ public class CleanRuleProcessorManager {
switch (identifier) {
case "people_in", "people_out" ->
trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "bluetoothDevices" ->
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "bluetoothDevices" -> {
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
// 轨迹检测:独立于保洁到岗检测,匹配所有已知 Beacon
trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value);
}
default -> {
// 其他属性/事件忽略
}

View File

@@ -0,0 +1,432 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.integration.constants.TrajectoryTopics;
import com.viewsh.module.iot.core.integration.event.trajectory.TrajectoryEnterEvent;
import com.viewsh.module.iot.core.integration.event.trajectory.TrajectoryLeaveEvent;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.dataobject.integration.clean.TrajectoryTrackingConfig;
import com.viewsh.module.iot.dal.redis.clean.TrajectoryRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.TrajectoryStateRedisDAO;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.integration.clean.BeaconRegistryService;
import com.viewsh.module.iot.service.integration.clean.BeaconRegistryService.BeaconAreaInfo;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector.AreaState;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector.DetectionResult;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 轨迹检测处理器
* <p>
* 工牌每次上报蓝牙数据时,匹配系统中所有已知 Beacon
* 基于"强进弱出"双阈值算法判断进出区域,自动记录轨迹。
* <p>
* 同一时刻只认一个区域,复用现有 {@link RssiSlidingWindowDetector}。
* <p>
* 核心流程:
* <ol>
* <li>检查设备是否开启轨迹功能</li>
* <li>获取全量 Beacon 注册表</li>
* <li>提取蓝牙列表中所有已知 Beacon 的 RSSI</li>
* <li>更新各区域的 RSSI 滑动窗口</li>
* <li>检测当前区域退出 / 新区域进入</li>
* <li>发布进入/离开事件到 RocketMQ</li>
* </ol>
*
* @author lzh
*/
@Component
@Slf4j
public class TrajectoryDetectionProcessor {
/**
* 设备轨迹功能开关缓存 Key
*/
private static final String DEVICE_ENABLED_KEY_PATTERN = "iot:trajectory:device:enabled:%s";
private static final int DEVICE_ENABLED_TTL_SECONDS = 3600; // 1小时
@Resource
private BeaconRegistryService beaconRegistryService;
@Resource
private TrajectoryRssiWindowRedisDAO windowRedisDAO;
@Resource
private TrajectoryStateRedisDAO stateRedisDAO;
@Resource
private RssiSlidingWindowDetector detector;
@Resource
private IotDeviceService deviceService;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 处理蓝牙属性上报
*
* @param deviceId 设备ID工牌
* @param identifier 属性标识符
* @param propertyValue 蓝牙设备列表
*/
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
if (!"bluetoothDevices".equals(identifier)) {
return;
}
// 1. 检查设备是否开启轨迹功能
if (!isTrajectoryEnabled(deviceId)) {
return;
}
// 2. 信号丢失补偿快速路径propertyValue 为 null 且设备不在任何区域时,直接跳过
// 避免每次无蓝牙数据的属性上报都查询 Beacon 注册表
if (propertyValue == null && stateRedisDAO.getCurrentArea(deviceId) == null) {
return;
}
// 3. 获取 Beacon 注册表
Map<String, BeaconAreaInfo> beaconRegistry = beaconRegistryService.getAllBeaconConfigs();
if (beaconRegistry == null || beaconRegistry.isEmpty()) {
log.debug("[Trajectory] Beacon 注册表为空跳过检测deviceId={}", deviceId);
return;
}
// 4. 构建 areaId → config 索引(避免后续 O(n) 扫描)
Map<Long, BeaconPresenceConfig> areaConfigIndex = new HashMap<>();
for (BeaconAreaInfo info : beaconRegistry.values()) {
areaConfigIndex.putIfAbsent(info.getAreaId(), info.getConfig());
}
// 5. 提取蓝牙列表中所有已知 Beacon 的 RSSI
Map<Long, MatchedBeacon> matchedBeacons = extractMatchedBeacons(propertyValue, beaconRegistry);
// 6. 获取当前所在区域
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea = stateRedisDAO.getCurrentArea(deviceId);
// 7. 对所有匹配区域更新滑动窗口
updateAllWindows(deviceId, matchedBeacons, areaConfigIndex, currentArea);
// 8. 处理区域状态变化
if (currentArea != null) {
processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex);
} else {
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
}
}
// ==================== 核心检测逻辑 ====================
/**
* 当前在某区域时的处理逻辑
*/
private void processWithCurrentArea(Long deviceId,
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea,
Map<Long, MatchedBeacon> matchedBeacons,
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
Long currentAreaId = currentArea.getAreaId();
// 6a. 检查当前区域的退出条件
BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentAreaId);
if (currentConfig != null) {
List<Integer> window = windowRedisDAO.getWindow(deviceId, currentAreaId);
DetectionResult exitResult = detector.detect(
window,
currentConfig.getEnter(),
currentConfig.getExit(),
AreaState.IN_AREA);
if (exitResult == DetectionResult.LEAVE_CONFIRMED) {
// 确认离开当前区域
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
"SIGNAL_LOSS", currentArea.getEnterTime());
stateRedisDAO.clearCurrentArea(deviceId);
windowRedisDAO.clearWindow(deviceId, currentAreaId);
log.info("[Trajectory] 离开区域deviceId={}, areaId={}, reason=SIGNAL_LOSS", deviceId, currentAreaId);
// 离开后,尝试进入新区域
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
return;
}
}
// 6b. 当前区域未退出,检查是否有更强区域触发切换
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, currentAreaId);
if (bestCandidate != null && !bestCandidate.areaId.equals(currentAreaId)) {
// 区域切换:先离开当前区域,再进入新区域
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
"AREA_SWITCH", currentArea.getEnterTime());
windowRedisDAO.clearWindow(deviceId, currentAreaId);
long now = System.currentTimeMillis();
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
log.info("[Trajectory] 区域切换deviceId={}, from={}, to={}", deviceId, currentAreaId, bestCandidate.areaId);
}
}
/**
* 当前不在任何区域时的处理逻辑
*/
private void processWithoutCurrentArea(Long deviceId,
Map<Long, MatchedBeacon> matchedBeacons,
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, null);
if (bestCandidate != null) {
long now = System.currentTimeMillis();
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
log.info("[Trajectory] 进入区域deviceId={}, areaId={}, rssi={}", deviceId, bestCandidate.areaId, bestCandidate.rssi);
}
}
// ==================== 辅助方法 ====================
/**
* 从蓝牙列表中提取所有已知 Beacon 的 RSSI
* 同一区域如果有多个 Beacon 匹配,取 RSSI 最强的
*/
private Map<Long, MatchedBeacon> extractMatchedBeacons(Object bluetoothDevices,
Map<String, BeaconAreaInfo> beaconRegistry) {
Map<Long, MatchedBeacon> result = new HashMap<>();
if (bluetoothDevices == null) {
return result;
}
try {
if (!(bluetoothDevices instanceof List)) {
log.warn("[Trajectory] bluetoothDevices 类型不正确: {}", bluetoothDevices.getClass().getName());
return result;
}
@SuppressWarnings("unchecked")
List<Map<String, Object>> deviceList = (List<Map<String, Object>>) bluetoothDevices;
for (Map<String, Object> device : deviceList) {
String mac = (String) device.get("mac");
if (mac == null) {
continue;
}
mac = mac.toUpperCase();
BeaconAreaInfo info = beaconRegistry.get(mac);
if (info == null) {
continue;
}
Integer rssi = toInt(device.get("rssi"));
if (rssi == null) {
continue;
}
Long areaId = info.getAreaId();
MatchedBeacon existing = result.get(areaId);
if (existing == null || rssi > existing.rssi) {
result.put(areaId, new MatchedBeacon(areaId, mac, rssi, info.getConfig()));
}
}
} catch (Exception e) {
log.error("[Trajectory] 解析蓝牙数据失败", e);
}
return result;
}
/**
* 更新所有匹配区域的滑动窗口
* 对于当前所在但未匹配到的区域,注入 -999信号缺失
*/
private void updateAllWindows(Long deviceId,
Map<Long, MatchedBeacon> matchedBeacons,
Map<Long, BeaconPresenceConfig> areaConfigIndex,
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea) {
// 更新匹配到的区域
for (Map.Entry<Long, MatchedBeacon> entry : matchedBeacons.entrySet()) {
MatchedBeacon mb = entry.getValue();
int maxWindowSize = Math.max(
mb.config.getEnter().getWindowSize(),
mb.config.getExit().getWindowSize());
windowRedisDAO.updateWindow(deviceId, mb.areaId, mb.rssi, maxWindowSize);
}
// 当前区域未匹配到 Beacon 时,注入 -999 补偿
if (currentArea != null && !matchedBeacons.containsKey(currentArea.getAreaId())) {
BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentArea.getAreaId());
if (currentConfig != null) {
int maxWindowSize = Math.max(
currentConfig.getEnter().getWindowSize(),
currentConfig.getExit().getWindowSize());
windowRedisDAO.updateWindow(deviceId, currentArea.getAreaId(), -999, maxWindowSize);
}
}
}
/**
* 找到信号最强且满足进入条件的候选区域
*
* @param excludeAreaId 排除的区域ID当前所在区域null 表示不排除
*/
private MatchedBeacon findBestEnterCandidate(Long deviceId,
Map<Long, MatchedBeacon> matchedBeacons,
Long excludeAreaId) {
MatchedBeacon best = null;
for (Map.Entry<Long, MatchedBeacon> entry : matchedBeacons.entrySet()) {
Long areaId = entry.getKey();
if (areaId.equals(excludeAreaId)) {
continue;
}
MatchedBeacon mb = entry.getValue();
List<Integer> window = windowRedisDAO.getWindow(deviceId, areaId);
DetectionResult result = detector.detect(
window,
mb.config.getEnter(),
mb.config.getExit(),
AreaState.OUT_AREA);
if (result == DetectionResult.ARRIVE_CONFIRMED) {
if (best == null || mb.rssi > best.rssi) {
best = mb;
}
}
}
return best;
}
// ==================== 功能开关 ====================
/**
* 检查设备是否开启轨迹功能
* 结果缓存1小时
*/
private boolean isTrajectoryEnabled(Long deviceId) {
String cacheKey = String.format(DEVICE_ENABLED_KEY_PATTERN, deviceId);
// 先读缓存
try {
String cached = stringRedisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return "true".equals(cached);
}
} catch (Exception e) {
log.warn("[Trajectory] 读取功能开关缓存失败deviceId={}", deviceId, e);
}
// 缓存未命中,从设备配置读取
boolean enabled = false;
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
if (device != null && device.getConfig() != null) {
@SuppressWarnings("unchecked")
Map<String, Object> configMap = JsonUtils.parseObject(device.getConfig(), Map.class);
if (configMap != null && configMap.containsKey("trajectoryTracking")) {
Object ttObj = configMap.get("trajectoryTracking");
TrajectoryTrackingConfig ttConfig = JsonUtils.parseObject(
JsonUtils.toJsonString(ttObj), TrajectoryTrackingConfig.class);
enabled = ttConfig != null && Boolean.TRUE.equals(ttConfig.getEnabled());
}
}
} catch (Exception e) {
log.error("[Trajectory] 获取轨迹配置失败deviceId={}", deviceId, e);
}
// 写入缓存
try {
stringRedisTemplate.opsForValue().set(cacheKey, String.valueOf(enabled),
DEVICE_ENABLED_TTL_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("[Trajectory] 写入功能开关缓存失败deviceId={}", deviceId, e);
}
return enabled;
}
// ==================== 事件发布 ====================
private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi) {
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
TrajectoryEnterEvent event = TrajectoryEnterEvent.builder()
.deviceId(deviceId)
.deviceName(device != null ? device.getDeviceName() : null)
.nickname(device != null ? device.getNickname() : null)
.areaId(areaId)
.beaconMac(beaconMac)
.enterRssi(enterRssi)
.tenantId(TenantContextHolder.getTenantId())
.build();
rocketMQTemplate.syncSend(TrajectoryTopics.TRAJECTORY_ENTER,
MessageBuilder.withPayload(event).build());
log.info("[Trajectory] 发布进入事件eventId={}, deviceId={}, areaId={}",
event.getEventId(), deviceId, areaId);
} catch (Exception e) {
log.error("[Trajectory] 发布进入事件失败deviceId={}, areaId={}", deviceId, areaId, e);
}
}
private void publishLeaveEvent(Long deviceId, Long areaId, String beaconMac,
String leaveReason, Long enterTimestamp) {
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
TrajectoryLeaveEvent event = TrajectoryLeaveEvent.builder()
.deviceId(deviceId)
.deviceName(device != null ? device.getDeviceName() : null)
.nickname(device != null ? device.getNickname() : null)
.areaId(areaId)
.beaconMac(beaconMac)
.leaveReason(leaveReason)
.enterTimestamp(enterTimestamp)
.tenantId(TenantContextHolder.getTenantId())
.build();
rocketMQTemplate.syncSend(TrajectoryTopics.TRAJECTORY_LEAVE,
MessageBuilder.withPayload(event).build());
log.info("[Trajectory] 发布离开事件eventId={}, deviceId={}, areaId={}, reason={}",
event.getEventId(), deviceId, areaId, leaveReason);
} catch (Exception e) {
log.error("[Trajectory] 发布离开事件失败deviceId={}, areaId={}", deviceId, areaId, e);
}
}
private Integer toInt(Object value) {
if (value instanceof Integer) {
return (Integer) value;
}
if (value instanceof Number) {
return ((Number) value).intValue();
}
return null;
}
/**
* 匹配到的 Beacon 信息
*/
private record MatchedBeacon(Long areaId, String beaconMac, Integer rssi, BeaconPresenceConfig config) {
}
}

View File

@@ -39,14 +39,20 @@ public interface AreaDeviceApi {
@PathVariable("areaId") Long areaId
);
@GetMapping(PREFIX + "/{areaId}/devices")
@Operation(summary = "查询区域设备列表(按类型)")
CommonResult<List<AreaDeviceDTO>> getDevicesByAreaAndType(
@Parameter(description = "区域ID", required = true, example = "1302")
@PathVariable("areaId") Long areaId,
@Parameter(description = "关联类型BADGE/TRAFFIC_COUNTER/BEACON", required = false, example = "BADGE")
@RequestParam(value = "relationType", required = false) String relationType
);
@GetMapping(PREFIX + "/{areaId}/devices")
@Operation(summary = "查询区域设备列表(按类型)")
CommonResult<List<AreaDeviceDTO>> getDevicesByAreaAndType(
@Parameter(description = "区域ID", required = true, example = "1302")
@PathVariable("areaId") Long areaId,
@Parameter(description = "关联类型BADGE/TRAFFIC_COUNTER/BEACON", required = false, example = "BADGE")
@RequestParam(value = "relationType", required = false) String relationType
);
// ==================== 全量查询 ====================
@GetMapping(PREFIX + "/beacons/all")
@Operation(summary = "查询所有启用的Beacon设备轨迹检测用")
CommonResult<List<AreaDeviceDTO>> getAllEnabledBeacons();
// ==================== 按设备查询 ====================

View File

@@ -82,6 +82,23 @@ public interface AreaDeviceService {
*/
List<Long> getDeviceIdsByAreaAndType(Long areaId, String relationType);
/**
* 查询所有启用的指定类型设备关联
*
* @param relationType 关联类型BADGE/BEACON/TRAFFIC_COUNTER
* @return 所有启用的指定类型关联关系
*/
List<OpsAreaDeviceRelationDO> listAllByType(String relationType);
/**
* 查询所有启用的 Beacon 设备关联
* <p>
* 用于轨迹检测功能,获取全量 Beacon 注册表
*
* @return 所有启用的 Beacon 类型关联关系
*/
List<OpsAreaDeviceRelationDO> listAllEnabledBeacons();
/**
* 初始化区域设备配置缓存
* <p>

View File

@@ -163,6 +163,16 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
.collect(Collectors.toList());
}
@Override
public List<OpsAreaDeviceRelationDO> listAllByType(String relationType) {
return relationMapper.selectListByAreaIdAndRelationType(null, relationType);
}
@Override
public List<OpsAreaDeviceRelationDO> listAllEnabledBeacons() {
return listAllByType("BEACON");
}
@Override
public void initConfigCache() {
log.info("[AreaDevice] 开始初始化区域设备配置缓存...");

View File

@@ -37,6 +37,13 @@ public class AreaDeviceController {
@Resource
private AreaDeviceService areaDeviceService;
@GetMapping("/beacons/all")
@Operation(summary = "查询所有启用的Beacon设备轨迹检测用")
public CommonResult<List<AreaDeviceDTO>> getAllEnabledBeacons() {
List<OpsAreaDeviceRelationDO> relations = areaDeviceService.listAllEnabledBeacons();
return success(BeanUtils.toBean(relations, AreaDeviceDTO.class));
}
@GetMapping("/{areaId}/badges")
@Operation(summary = "查询区域的工牌设备列表")
public CommonResult<List<AreaDeviceDTO>> getBadgesByArea(
@@ -45,21 +52,21 @@ public class AreaDeviceController {
return success(BeanUtils.toBean(relations, AreaDeviceDTO.class));
}
@GetMapping("/{areaId}/devices")
@Operation(summary = "查询区域设备列表(按类型)")
public CommonResult<List<AreaDeviceDTO>> getDevicesByAreaAndType(
@PathVariable("areaId") Long areaId,
@RequestParam(value = "relationType", required = false) String relationType) {
List<OpsAreaDeviceRelationDO> relations;
if (relationType != null) {
relations = areaDeviceService.listByAreaIdAndType(areaId, relationType);
} else {
relations = areaDeviceService.listByAreaId(areaId);
}
return success(BeanUtils.toBean(relations, AreaDeviceDTO.class));
}
@GetMapping("/{areaId}/devices")
@Operation(summary = "查询区域设备列表(按类型)")
public CommonResult<List<AreaDeviceDTO>> getDevicesByAreaAndType(
@PathVariable("areaId") Long areaId,
@RequestParam(value = "relationType", required = false) String relationType) {
List<OpsAreaDeviceRelationDO> relations;
if (relationType != null) {
relations = areaDeviceService.listByAreaIdAndType(areaId, relationType);
} else {
relations = areaDeviceService.listByAreaId(areaId);
}
return success(BeanUtils.toBean(relations, AreaDeviceDTO.class));
}
@GetMapping("/device/{deviceId}/relation")
@Operation(summary = "查询设备的关联关系")