Merge branch 'master' into feat/multi-tenant

# Conflicts:
#	viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/dal/redis/TrafficActiveOrderRedisDAO.java
#	viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java
#	viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java
This commit is contained in:
lzh
2026-04-13 14:35:27 +08:00
57 changed files with 3612 additions and 106 deletions

View File

@@ -0,0 +1,46 @@
package com.viewsh.module.iot.controller.rpc;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.module.iot.api.trajectory.DeviceLocationDTO;
import com.viewsh.module.iot.api.trajectory.TrajectoryStateApi;
import com.viewsh.module.iot.dal.redis.clean.TrajectoryStateRedisDAO;
import jakarta.annotation.Resource;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import static com.viewsh.framework.common.pojo.CommonResult.success;
/**
* 轨迹实时状态 RPC Controller
* <p>
* 实现 {@link TrajectoryStateApi},从 Redis 读取设备当前位置
*
* @author lzh
*/
@RestController
@Validated
public class TrajectoryStateController implements TrajectoryStateApi {
@Resource
private TrajectoryStateRedisDAO trajectoryStateRedisDAO;
@Override
public CommonResult<DeviceLocationDTO> getCurrentLocation(Long deviceId) {
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea = trajectoryStateRedisDAO.getCurrentArea(deviceId);
if (currentArea == null) {
return success(DeviceLocationDTO.builder()
.deviceId(deviceId)
.inArea(false)
.build());
}
return success(DeviceLocationDTO.builder()
.deviceId(deviceId)
.areaId(currentArea.getAreaId())
.enterTime(currentArea.getEnterTime())
.beaconMac(currentArea.getBeaconMac())
.inArea(true)
.build());
}
}

View File

@@ -12,6 +12,27 @@ import lombok.Data;
@Data
public class TrafficThresholdConfig {
/**
* 上报模式INCREMENTAL / CUMULATIVE
*/
public static final String REPORT_MODE_INCREMENTAL = "INCREMENTAL";
public static final String REPORT_MODE_CUMULATIVE = "CUMULATIVE";
/**
* 设备上报模式
* <p>
* INCREMENTAL默认设备上报增量值直接使用
* CUMULATIVE设备上报累计值需计算差值得到增量
*/
private String reportMode;
/**
* 是否累计值模式
*/
public boolean isCumulative() {
return REPORT_MODE_CUMULATIVE.equalsIgnoreCase(reportMode);
}
/**
* 触发阈值
* <p>

View File

@@ -0,0 +1,26 @@
package com.viewsh.module.iot.dal.dataobject.integration.clean;
import lombok.Data;
/**
* 轨迹记录功能配置
* <p>
* 存储在 iot_device.config JSON 字段中,与 ButtonEventConfig 同级
* <pre>
* {
* "buttonEvent": { ... },
* "trajectoryTracking": { "enabled": true }
* }
* </pre>
*
* @author lzh
*/
@Data
public class TrajectoryTrackingConfig {
/**
* 是否启用轨迹记录
*/
private Boolean enabled;
}

View File

@@ -244,6 +244,58 @@ public class TrafficCounterRedisDAO {
return null;
}
// ==================== 累计值设备上次值存取 ====================
/**
* 累计值设备上次上报值 Key 模式
* <p>
* 格式iot:clean:traffic:lastvalue:{deviceId}
* Hash Fieldpeople_in / people_out
*/
private static final String LAST_VALUE_KEY_PATTERN = "iot:clean:traffic:lastvalue:%s";
/**
* 累计值 TTL- 7 天,跨天不丢失,设备离线数天后仍能衔接
*/
private static final int LAST_VALUE_TTL_SECONDS = 604800;
/**
* 获取设备上次上报的累计值
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in / people_out
* @return 上次累计值,首次上报时返回 null
*/
public Long getLastCumulativeValue(Long deviceId, String identifier) {
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
Object value = stringRedisTemplate.opsForHash().get(key, identifier);
if (value == null) {
return null;
}
try {
return Long.parseLong(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
/**
* 更新设备上次上报的累计值
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in / people_out
* @param value 当前累计值
*/
public void setLastCumulativeValue(Long deviceId, String identifier, long value) {
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
stringRedisTemplate.opsForHash().put(key, identifier, String.valueOf(value));
// 仅在 key 无 TTL 时设置,避免高频上报场景下每次都执行 EXPIRE
Long ttl = stringRedisTemplate.getExpire(key, TimeUnit.SECONDS);
if (ttl == null || ttl == -1) {
stringRedisTemplate.expire(key, LAST_VALUE_TTL_SECONDS, TimeUnit.SECONDS);
}
}
// ==================== 私有方法 ====================
private static String formatThresholdKey(Long deviceId, Long areaId) {

View File

@@ -0,0 +1,71 @@
package com.viewsh.module.iot.dal.redis.clean;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 轨迹检测 RSSI 滑动窗口 Redis DAO
* <p>
* 与 BeaconRssiWindowRedisDAO 功能相同,使用独立的 Key 前缀,
* 避免与保洁到岗检测的窗口数据互相干扰
*
* @author lzh
*/
@Repository
public class TrajectoryRssiWindowRedisDAO {
private static final String WINDOW_KEY_PATTERN = "iot:trajectory:rssi:window:%s:%s";
private static final int WINDOW_TTL_SECONDS = 3600;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 更新滑动窗口(保留最近 N 个样本)
*/
public void updateWindow(Long deviceId, Long areaId, Integer rssi, Integer maxSize) {
String key = formatKey(deviceId, areaId);
List<Integer> window = getWindow(deviceId, areaId);
window.add(rssi);
if (window.size() > maxSize) {
window = window.subList(window.size() - maxSize, window.size());
}
String windowStr = window.stream()
.map(String::valueOf)
.collect(Collectors.joining(","));
stringRedisTemplate.opsForValue().set(key, windowStr, WINDOW_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 获取窗口样本
*/
public List<Integer> getWindow(Long deviceId, Long areaId) {
String key = formatKey(deviceId, areaId);
String windowStr = stringRedisTemplate.opsForValue().get(key);
if (windowStr == null || windowStr.isEmpty()) {
return new ArrayList<>();
}
return List.of(windowStr.split(","))
.stream()
.map(Integer::parseInt)
.collect(Collectors.toList());
}
/**
* 清除窗口
*/
public void clearWindow(Long deviceId, Long areaId) {
String key = formatKey(deviceId, areaId);
stringRedisTemplate.delete(key);
}
private static String formatKey(Long deviceId, Long areaId) {
return String.format(WINDOW_KEY_PATTERN, deviceId, areaId);
}
}

View File

@@ -0,0 +1,109 @@
package com.viewsh.module.iot.dal.redis.clean;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 轨迹实时状态 Redis DAO
* <p>
* 维护每个设备当前所在区域的实时状态
* <p>
* Key: iot:trajectory:current:{deviceId}
* Type: Hash
* Fields: areaId, enterTime, beaconMac
* TTL: 24小时
*
* @author lzh
*/
@Slf4j
@Repository
public class TrajectoryStateRedisDAO {
private static final String CURRENT_KEY_PATTERN = "iot:trajectory:current:%s";
private static final int CURRENT_TTL_SECONDS = 86400; // 24小时
private static final String FIELD_AREA_ID = "areaId";
private static final String FIELD_ENTER_TIME = "enterTime";
private static final String FIELD_BEACON_MAC = "beaconMac";
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 设置当前所在区域
*/
public void setCurrentArea(Long deviceId, Long areaId, long enterTime, String beaconMac) {
String key = formatKey(deviceId);
// 先删后写,清除旧字段(如上次的 beaconMac保证一致性
stringRedisTemplate.delete(key);
Map<String, String> fields = new HashMap<>();
fields.put(FIELD_AREA_ID, String.valueOf(areaId));
fields.put(FIELD_ENTER_TIME, String.valueOf(enterTime));
if (beaconMac != null) {
fields.put(FIELD_BEACON_MAC, beaconMac);
}
stringRedisTemplate.opsForHash().putAll(key, fields);
stringRedisTemplate.expire(key, CURRENT_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 获取当前所在区域信息
*
* @return 当前区域信息,如果不在任何区域则返回 null
*/
public CurrentAreaInfo getCurrentArea(Long deviceId) {
String key = formatKey(deviceId);
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(key);
if (entries == null || entries.isEmpty()) {
return null;
}
Object areaIdObj = entries.get(FIELD_AREA_ID);
Object enterTimeObj = entries.get(FIELD_ENTER_TIME);
if (areaIdObj == null || enterTimeObj == null) {
return null;
}
CurrentAreaInfo info = new CurrentAreaInfo();
info.setAreaId(Long.parseLong(areaIdObj.toString()));
info.setEnterTime(Long.parseLong(enterTimeObj.toString()));
Object beaconMacObj = entries.get(FIELD_BEACON_MAC);
if (beaconMacObj != null) {
info.setBeaconMac(beaconMacObj.toString());
}
return info;
}
/**
* 清除当前区域状态
*/
public void clearCurrentArea(Long deviceId) {
String key = formatKey(deviceId);
stringRedisTemplate.delete(key);
}
private static String formatKey(Long deviceId) {
return String.format(CURRENT_KEY_PATTERN, deviceId);
}
/**
* 当前区域信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class CurrentAreaInfo {
private Long areaId;
private Long enterTime;
private String beaconMac;
}
}

View File

@@ -0,0 +1,51 @@
package com.viewsh.module.iot.service.integration.clean;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* Beacon 全量注册表服务
* <p>
* 为轨迹检测功能提供 beaconMac → {areaId, config} 的全量映射
* <p>
* 数据来源Ops 模块的 ops_area_device_relation 表relationType=BEACON
* 缓存Redis Hash1小时 TTL启动预热 + 定时刷新
*
* @author lzh
*/
public interface BeaconRegistryService {
/**
* 获取所有已注册的 Beacon 配置
*
* @return beaconMac(大写) → BeaconAreaInfo 的映射
*/
Map<String, BeaconAreaInfo> getAllBeaconConfigs();
/**
* 刷新 Beacon 注册表缓存
*/
void refreshRegistry();
/**
* Beacon 与区域的关联信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
class BeaconAreaInfo {
/**
* 区域ID
*/
private Long areaId;
/**
* Beacon 检测配置
*/
private BeaconPresenceConfig config;
}
}

View File

@@ -0,0 +1,165 @@
package com.viewsh.module.iot.service.integration.clean;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig;
import com.viewsh.module.ops.api.area.AreaDeviceApi;
import com.viewsh.module.ops.api.area.AreaDeviceDTO;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Beacon 全量注册表服务实现
* <p>
* 缓存策略:
* - Redis Hash: iot:trajectory:beacon:registryfield=beaconMacvalue=JSON{areaId,config}
* - TTL: 1小时
* - 启动时预热每30分钟定时刷新
* - 如果 Redis 缓存失效,降级为实时调用 Feign API
*
* @author lzh
*/
@Slf4j
@Service
public class BeaconRegistryServiceImpl implements BeaconRegistryService {
private static final String REGISTRY_KEY = "iot:trajectory:beacon:registry";
private static final int REGISTRY_TTL_SECONDS = 3600; // 1小时
@Resource
private AreaDeviceApi areaDeviceApi;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private CleanOrderIntegrationConfigService configService;
/**
* 启动时预热缓存
*/
@PostConstruct
public void init() {
try {
refreshRegistry();
} catch (Exception e) {
log.warn("[BeaconRegistry] 启动预热失败,将在下次查询时加载", e);
}
}
/**
* 每30分钟刷新一次注册表
*/
@Scheduled(fixedDelay = 30 * 60 * 1000, initialDelay = 30 * 60 * 1000)
public void scheduledRefresh() {
try {
refreshRegistry();
} catch (Exception e) {
log.error("[BeaconRegistry] 定时刷新失败", e);
}
}
@Override
public Map<String, BeaconAreaInfo> getAllBeaconConfigs() {
// 1. 尝试从 Redis 读取
try {
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(REGISTRY_KEY);
if (entries != null && !entries.isEmpty()) {
Map<String, BeaconAreaInfo> result = new HashMap<>(entries.size());
for (Map.Entry<Object, Object> entry : entries.entrySet()) {
String mac = entry.getKey().toString();
BeaconAreaInfo info = JsonUtils.parseObject(entry.getValue().toString(), BeaconAreaInfo.class);
if (info != null) {
result.put(mac, info);
}
}
log.debug("[BeaconRegistry] 从缓存加载 {} 个 Beacon 配置", result.size());
return result;
}
} catch (Exception e) {
log.warn("[BeaconRegistry] 读取缓存失败,降级为实时查询", e);
}
// 2. 缓存未命中,实时查询并写入缓存
return loadAndCacheRegistry();
}
@Override
public void refreshRegistry() {
loadAndCacheRegistry();
}
/**
* 从 Ops 加载全量 Beacon 注册表并写入 Redis 缓存
*/
private Map<String, BeaconAreaInfo> loadAndCacheRegistry() {
Map<String, BeaconAreaInfo> registry = new HashMap<>();
try {
CommonResult<List<AreaDeviceDTO>> result = areaDeviceApi.getAllEnabledBeacons();
if (result == null || !result.isSuccess() || result.getData() == null) {
log.warn("[BeaconRegistry] 调用 Ops 获取 Beacon 列表失败");
return registry;
}
Map<String, String> redisEntries = new HashMap<>();
for (AreaDeviceDTO dto : result.getData()) {
if (dto.getConfigData() == null || dto.getAreaId() == null) {
continue;
}
// 反序列化配置
CleanOrderIntegrationConfig integrationConfig = configService.convertConfig(dto.getConfigData());
if (integrationConfig == null || integrationConfig.getBeaconPresence() == null) {
continue;
}
BeaconPresenceConfig beaconConfig = integrationConfig.getBeaconPresence();
if (beaconConfig.getEnabled() == null || !beaconConfig.getEnabled()) {
continue;
}
String mac = beaconConfig.getBeaconMac();
if (mac == null || mac.isEmpty()) {
continue;
}
// 统一转大写
mac = mac.toUpperCase();
BeaconAreaInfo info = new BeaconAreaInfo(dto.getAreaId(), beaconConfig);
registry.put(mac, info);
redisEntries.put(mac, JsonUtils.toJsonString(info));
}
// 写入 Redis使用临时 Key + rename 保证原子性,避免竞态空窗期)
if (!redisEntries.isEmpty()) {
String tempKey = REGISTRY_KEY + ":tmp:" + System.currentTimeMillis();
stringRedisTemplate.opsForHash().putAll(tempKey, redisEntries);
stringRedisTemplate.expire(tempKey, REGISTRY_TTL_SECONDS, TimeUnit.SECONDS);
stringRedisTemplate.rename(tempKey, REGISTRY_KEY);
} else {
// 所有 Beacon 都被禁用时,清除旧缓存
stringRedisTemplate.delete(REGISTRY_KEY);
}
log.info("[BeaconRegistry] 加载并缓存 {} 个 Beacon 配置", registry.size());
} catch (Exception e) {
log.error("[BeaconRegistry] 加载 Beacon 注册表失败", e);
}
return registry;
}
}

View File

@@ -5,6 +5,7 @@ import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.ButtonEventRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.TrafficThresholdRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.TrajectoryDetectionProcessor;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -33,6 +34,9 @@ public class CleanRuleProcessorManager {
@Resource
private ButtonEventRuleProcessor buttonEventRuleProcessor;
@Resource
private TrajectoryDetectionProcessor trajectoryDetectionProcessor;
/**
* 处理设备消息
* <p>
@@ -76,6 +80,8 @@ public class CleanRuleProcessorManager {
// 避免退出检测窗口因无数据而停滞
if (!data.containsKey("bluetoothDevices")) {
beaconDetectionRuleProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
// 轨迹检测同样需要信号丢失补偿,注入 null 使窗口写入 -999
trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
}
}
}
@@ -126,8 +132,11 @@ public class CleanRuleProcessorManager {
switch (identifier) {
case "people_in", "people_out" ->
trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "bluetoothDevices" ->
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "bluetoothDevices" -> {
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
// 轨迹检测:独立于保洁到岗检测,匹配所有已知 Beacon
trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value);
}
default -> {
// 其他属性/事件忽略
}

View File

@@ -51,12 +51,13 @@ public class TrafficThresholdRuleProcessor {
* - people_in累加到当日统计 + 阈值计数器(需配置)
* - people_out累加到当日统计
* <p>
* 当日累积统计(用于报表)与工单触发(需配置)解耦
* 即使设备未配置工单触发规则,统计数据也会正常采集。
* 支持两种上报模式(通过 configData.trafficThreshold.reportMode 配置):
* - INCREMENTAL默认上报值直接作为增量
* - CUMULATIVE上报值为累计值自动计算差值得到增量
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in 或 people_out
* @param propertyValue 属性值(周期内增量)
* @param propertyValue 属性值(增量或累计值,取决于 reportMode
*/
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
// 1. 校验属性类型
@@ -67,15 +68,28 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 收到客流属性deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
// 2. 解析增量
Long increment = parseTrafficCount(propertyValue);
if (increment == null || increment <= 0) {
log.debug("[TrafficThreshold] 增量值无效deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
// 2. 解析原始
Long rawValue = parseTrafficCount(propertyValue);
if (rawValue == null || rawValue <= 0) {
return;
}
// 3. 无条件累加到当日统计(统计与工单触发解耦)
// 3. 获取配置,判断上报模式
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId);
TrafficThresholdConfig thresholdConfig = resolveThresholdConfig(configWrapper);
// 4. 根据上报模式计算增量
long increment;
if (thresholdConfig != null && thresholdConfig.isCumulative()) {
increment = resolveIncrement(deviceId, identifier, rawValue);
} else {
increment = rawValue;
}
if (increment <= 0) {
return;
}
// 5. 累加到当日统计(统计与工单触发解耦)
LocalDate today = LocalDate.now();
if ("people_in".equals(identifier)) {
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
@@ -85,21 +99,11 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 当日统计累加deviceId={}, identifier={}, increment={}",
deviceId, identifier, increment);
// 4. 以下为工单触发逻辑,需要设备配置支持
// 6. 以下为工单触发逻辑,仅 people_in 参与
if (!"people_in".equals(identifier)) {
return; // people_out 不参与阈值判定
}
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
getConfigWrapper(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null) {
log.debug("[TrafficThreshold] 设备无工单触发配置deviceId={}", deviceId);
return;
}
TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold();
if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) {
log.debug("[TrafficThreshold] 未启用客流阈值触发deviceId={}", deviceId);
if (thresholdConfig == null || !Boolean.TRUE.equals(thresholdConfig.getAutoCreateOrder())) {
return;
}
@@ -107,10 +111,57 @@ public class TrafficThresholdRuleProcessor {
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
}
/**
* 从配置包装器中提取客流阈值配置
*
* @return 阈值配置,无配置时返回 null
*/
private TrafficThresholdConfig resolveThresholdConfig(
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
if (configWrapper == null || configWrapper.getConfig() == null) {
return null;
}
return configWrapper.getConfig().getTrafficThreshold();
}
/**
* 累计值转增量
* <p>
* 通过 Redis 存储上次上报的累计值,计算差值得到本次增量。
* 处理三种场景:首次上报、正常递增、设备重启归零。
*
* @param deviceId 设备ID
* @param identifier 属性标识符
* @param currentValue 本次上报的累计值
* @return 增量值;首次上报返回 0
*/
private long resolveIncrement(Long deviceId, String identifier, long currentValue) {
Long lastValue = trafficCounterRedisDAO.getLastCumulativeValue(deviceId, identifier);
// 无论是否能算出增量,都记录当前值
trafficCounterRedisDAO.setLastCumulativeValue(deviceId, identifier, currentValue);
if (lastValue == null) {
// 首次上报:无历史基准,不计入统计
log.info("[TrafficThreshold] 累计值设备首次上报建立基准deviceId={}, identifier={}, value={}",
deviceId, identifier, currentValue);
return 0;
}
if (currentValue >= lastValue) {
return currentValue - lastValue;
}
// currentValue < lastValue → 设备重启归零
log.info("[TrafficThreshold] 检测到设备重启deviceId={}, identifier={}, last={}, current={}",
deviceId, identifier, lastValue, currentValue);
return currentValue;
}
/**
* 处理 people_in 增量
*/
private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today,
private void handlePeopleIn(Long deviceId, Long areaId, long increment, LocalDate today,
TrafficThresholdConfig thresholdConfig,
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
// 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)

View File

@@ -0,0 +1,432 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.integration.constants.TrajectoryTopics;
import com.viewsh.module.iot.core.integration.event.trajectory.TrajectoryEnterEvent;
import com.viewsh.module.iot.core.integration.event.trajectory.TrajectoryLeaveEvent;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.dataobject.integration.clean.TrajectoryTrackingConfig;
import com.viewsh.module.iot.dal.redis.clean.TrajectoryRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.TrajectoryStateRedisDAO;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.integration.clean.BeaconRegistryService;
import com.viewsh.module.iot.service.integration.clean.BeaconRegistryService.BeaconAreaInfo;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector.AreaState;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector.DetectionResult;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 轨迹检测处理器
* <p>
* 工牌每次上报蓝牙数据时,匹配系统中所有已知 Beacon
* 基于"强进弱出"双阈值算法判断进出区域,自动记录轨迹。
* <p>
* 同一时刻只认一个区域,复用现有 {@link RssiSlidingWindowDetector}。
* <p>
* 核心流程:
* <ol>
* <li>检查设备是否开启轨迹功能</li>
* <li>获取全量 Beacon 注册表</li>
* <li>提取蓝牙列表中所有已知 Beacon 的 RSSI</li>
* <li>更新各区域的 RSSI 滑动窗口</li>
* <li>检测当前区域退出 / 新区域进入</li>
* <li>发布进入/离开事件到 RocketMQ</li>
* </ol>
*
* @author lzh
*/
@Component
@Slf4j
public class TrajectoryDetectionProcessor {
/**
* 设备轨迹功能开关缓存 Key
*/
private static final String DEVICE_ENABLED_KEY_PATTERN = "iot:trajectory:device:enabled:%s";
private static final int DEVICE_ENABLED_TTL_SECONDS = 3600; // 1小时
@Resource
private BeaconRegistryService beaconRegistryService;
@Resource
private TrajectoryRssiWindowRedisDAO windowRedisDAO;
@Resource
private TrajectoryStateRedisDAO stateRedisDAO;
@Resource
private RssiSlidingWindowDetector detector;
@Resource
private IotDeviceService deviceService;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 处理蓝牙属性上报
*
* @param deviceId 设备ID工牌
* @param identifier 属性标识符
* @param propertyValue 蓝牙设备列表
*/
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
if (!"bluetoothDevices".equals(identifier)) {
return;
}
// 1. 检查设备是否开启轨迹功能
if (!isTrajectoryEnabled(deviceId)) {
return;
}
// 2. 信号丢失补偿快速路径propertyValue 为 null 且设备不在任何区域时,直接跳过
// 避免每次无蓝牙数据的属性上报都查询 Beacon 注册表
if (propertyValue == null && stateRedisDAO.getCurrentArea(deviceId) == null) {
return;
}
// 3. 获取 Beacon 注册表
Map<String, BeaconAreaInfo> beaconRegistry = beaconRegistryService.getAllBeaconConfigs();
if (beaconRegistry == null || beaconRegistry.isEmpty()) {
log.debug("[Trajectory] Beacon 注册表为空跳过检测deviceId={}", deviceId);
return;
}
// 4. 构建 areaId → config 索引(避免后续 O(n) 扫描)
Map<Long, BeaconPresenceConfig> areaConfigIndex = new HashMap<>();
for (BeaconAreaInfo info : beaconRegistry.values()) {
areaConfigIndex.putIfAbsent(info.getAreaId(), info.getConfig());
}
// 5. 提取蓝牙列表中所有已知 Beacon 的 RSSI
Map<Long, MatchedBeacon> matchedBeacons = extractMatchedBeacons(propertyValue, beaconRegistry);
// 6. 获取当前所在区域
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea = stateRedisDAO.getCurrentArea(deviceId);
// 7. 对所有匹配区域更新滑动窗口
updateAllWindows(deviceId, matchedBeacons, areaConfigIndex, currentArea);
// 8. 处理区域状态变化
if (currentArea != null) {
processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex);
} else {
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
}
}
// ==================== 核心检测逻辑 ====================
/**
* 当前在某区域时的处理逻辑
*/
private void processWithCurrentArea(Long deviceId,
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea,
Map<Long, MatchedBeacon> matchedBeacons,
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
Long currentAreaId = currentArea.getAreaId();
// 6a. 检查当前区域的退出条件
BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentAreaId);
if (currentConfig != null) {
List<Integer> window = windowRedisDAO.getWindow(deviceId, currentAreaId);
DetectionResult exitResult = detector.detect(
window,
currentConfig.getEnter(),
currentConfig.getExit(),
AreaState.IN_AREA);
if (exitResult == DetectionResult.LEAVE_CONFIRMED) {
// 确认离开当前区域
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
"SIGNAL_LOSS", currentArea.getEnterTime());
stateRedisDAO.clearCurrentArea(deviceId);
windowRedisDAO.clearWindow(deviceId, currentAreaId);
log.info("[Trajectory] 离开区域deviceId={}, areaId={}, reason=SIGNAL_LOSS", deviceId, currentAreaId);
// 离开后,尝试进入新区域
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
return;
}
}
// 6b. 当前区域未退出,检查是否有更强区域触发切换
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, currentAreaId);
if (bestCandidate != null && !bestCandidate.areaId.equals(currentAreaId)) {
// 区域切换:先离开当前区域,再进入新区域
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
"AREA_SWITCH", currentArea.getEnterTime());
windowRedisDAO.clearWindow(deviceId, currentAreaId);
long now = System.currentTimeMillis();
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
log.info("[Trajectory] 区域切换deviceId={}, from={}, to={}", deviceId, currentAreaId, bestCandidate.areaId);
}
}
/**
* 当前不在任何区域时的处理逻辑
*/
private void processWithoutCurrentArea(Long deviceId,
Map<Long, MatchedBeacon> matchedBeacons,
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, null);
if (bestCandidate != null) {
long now = System.currentTimeMillis();
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
log.info("[Trajectory] 进入区域deviceId={}, areaId={}, rssi={}", deviceId, bestCandidate.areaId, bestCandidate.rssi);
}
}
// ==================== 辅助方法 ====================
/**
* 从蓝牙列表中提取所有已知 Beacon 的 RSSI
* 同一区域如果有多个 Beacon 匹配,取 RSSI 最强的
*/
private Map<Long, MatchedBeacon> extractMatchedBeacons(Object bluetoothDevices,
Map<String, BeaconAreaInfo> beaconRegistry) {
Map<Long, MatchedBeacon> result = new HashMap<>();
if (bluetoothDevices == null) {
return result;
}
try {
if (!(bluetoothDevices instanceof List)) {
log.warn("[Trajectory] bluetoothDevices 类型不正确: {}", bluetoothDevices.getClass().getName());
return result;
}
@SuppressWarnings("unchecked")
List<Map<String, Object>> deviceList = (List<Map<String, Object>>) bluetoothDevices;
for (Map<String, Object> device : deviceList) {
String mac = (String) device.get("mac");
if (mac == null) {
continue;
}
mac = mac.toUpperCase();
BeaconAreaInfo info = beaconRegistry.get(mac);
if (info == null) {
continue;
}
Integer rssi = toInt(device.get("rssi"));
if (rssi == null) {
continue;
}
Long areaId = info.getAreaId();
MatchedBeacon existing = result.get(areaId);
if (existing == null || rssi > existing.rssi) {
result.put(areaId, new MatchedBeacon(areaId, mac, rssi, info.getConfig()));
}
}
} catch (Exception e) {
log.error("[Trajectory] 解析蓝牙数据失败", e);
}
return result;
}
/**
* 更新所有匹配区域的滑动窗口
* 对于当前所在但未匹配到的区域,注入 -999信号缺失
*/
private void updateAllWindows(Long deviceId,
Map<Long, MatchedBeacon> matchedBeacons,
Map<Long, BeaconPresenceConfig> areaConfigIndex,
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea) {
// 更新匹配到的区域
for (Map.Entry<Long, MatchedBeacon> entry : matchedBeacons.entrySet()) {
MatchedBeacon mb = entry.getValue();
int maxWindowSize = Math.max(
mb.config.getEnter().getWindowSize(),
mb.config.getExit().getWindowSize());
windowRedisDAO.updateWindow(deviceId, mb.areaId, mb.rssi, maxWindowSize);
}
// 当前区域未匹配到 Beacon 时,注入 -999 补偿
if (currentArea != null && !matchedBeacons.containsKey(currentArea.getAreaId())) {
BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentArea.getAreaId());
if (currentConfig != null) {
int maxWindowSize = Math.max(
currentConfig.getEnter().getWindowSize(),
currentConfig.getExit().getWindowSize());
windowRedisDAO.updateWindow(deviceId, currentArea.getAreaId(), -999, maxWindowSize);
}
}
}
/**
* 找到信号最强且满足进入条件的候选区域
*
* @param excludeAreaId 排除的区域ID当前所在区域null 表示不排除
*/
private MatchedBeacon findBestEnterCandidate(Long deviceId,
Map<Long, MatchedBeacon> matchedBeacons,
Long excludeAreaId) {
MatchedBeacon best = null;
for (Map.Entry<Long, MatchedBeacon> entry : matchedBeacons.entrySet()) {
Long areaId = entry.getKey();
if (areaId.equals(excludeAreaId)) {
continue;
}
MatchedBeacon mb = entry.getValue();
List<Integer> window = windowRedisDAO.getWindow(deviceId, areaId);
DetectionResult result = detector.detect(
window,
mb.config.getEnter(),
mb.config.getExit(),
AreaState.OUT_AREA);
if (result == DetectionResult.ARRIVE_CONFIRMED) {
if (best == null || mb.rssi > best.rssi) {
best = mb;
}
}
}
return best;
}
// ==================== 功能开关 ====================
/**
* 检查设备是否开启轨迹功能
* 结果缓存1小时
*/
private boolean isTrajectoryEnabled(Long deviceId) {
String cacheKey = String.format(DEVICE_ENABLED_KEY_PATTERN, deviceId);
// 先读缓存
try {
String cached = stringRedisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return "true".equals(cached);
}
} catch (Exception e) {
log.warn("[Trajectory] 读取功能开关缓存失败deviceId={}", deviceId, e);
}
// 缓存未命中,从设备配置读取
boolean enabled = false;
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
if (device != null && device.getConfig() != null) {
@SuppressWarnings("unchecked")
Map<String, Object> configMap = JsonUtils.parseObject(device.getConfig(), Map.class);
if (configMap != null && configMap.containsKey("trajectoryTracking")) {
Object ttObj = configMap.get("trajectoryTracking");
TrajectoryTrackingConfig ttConfig = JsonUtils.parseObject(
JsonUtils.toJsonString(ttObj), TrajectoryTrackingConfig.class);
enabled = ttConfig != null && Boolean.TRUE.equals(ttConfig.getEnabled());
}
}
} catch (Exception e) {
log.error("[Trajectory] 获取轨迹配置失败deviceId={}", deviceId, e);
}
// 写入缓存
try {
stringRedisTemplate.opsForValue().set(cacheKey, String.valueOf(enabled),
DEVICE_ENABLED_TTL_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("[Trajectory] 写入功能开关缓存失败deviceId={}", deviceId, e);
}
return enabled;
}
// ==================== 事件发布 ====================
private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi) {
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
TrajectoryEnterEvent event = TrajectoryEnterEvent.builder()
.deviceId(deviceId)
.deviceName(device != null ? device.getDeviceName() : null)
.nickname(device != null ? device.getNickname() : null)
.areaId(areaId)
.beaconMac(beaconMac)
.enterRssi(enterRssi)
.tenantId(TenantContextHolder.getTenantId())
.build();
rocketMQTemplate.syncSend(TrajectoryTopics.TRAJECTORY_ENTER,
MessageBuilder.withPayload(event).build());
log.info("[Trajectory] 发布进入事件eventId={}, deviceId={}, areaId={}",
event.getEventId(), deviceId, areaId);
} catch (Exception e) {
log.error("[Trajectory] 发布进入事件失败deviceId={}, areaId={}", deviceId, areaId, e);
}
}
private void publishLeaveEvent(Long deviceId, Long areaId, String beaconMac,
String leaveReason, Long enterTimestamp) {
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
TrajectoryLeaveEvent event = TrajectoryLeaveEvent.builder()
.deviceId(deviceId)
.deviceName(device != null ? device.getDeviceName() : null)
.nickname(device != null ? device.getNickname() : null)
.areaId(areaId)
.beaconMac(beaconMac)
.leaveReason(leaveReason)
.enterTimestamp(enterTimestamp)
.tenantId(TenantContextHolder.getTenantId())
.build();
rocketMQTemplate.syncSend(TrajectoryTopics.TRAJECTORY_LEAVE,
MessageBuilder.withPayload(event).build());
log.info("[Trajectory] 发布离开事件eventId={}, deviceId={}, areaId={}, reason={}",
event.getEventId(), deviceId, areaId, leaveReason);
} catch (Exception e) {
log.error("[Trajectory] 发布离开事件失败deviceId={}, areaId={}", deviceId, areaId, e);
}
}
private Integer toInt(Object value) {
if (value instanceof Integer) {
return (Integer) value;
}
if (value instanceof Number) {
return ((Number) value).intValue();
}
return null;
}
/**
* 匹配到的 Beacon 信息
*/
private record MatchedBeacon(Long areaId, String beaconMac, Integer rssi, BeaconPresenceConfig config) {
}
}