3 Commits

Author SHA1 Message Date
lzh
1ca472ea93 feat(iot): 客流计数器支持累计值上报模式(CUMULATIVE)
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
TrafficThresholdConfig 新增 reportMode 字段,支持 INCREMENTAL(默认)和 CUMULATIVE 两种模式。
累计值设备通过 Redis 存储上次值自动算差值,处理首次上报跳过和设备重启归零场景。
现有增量设备无需改配置,行为完全兼容。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 13:59:20 +08:00
lzh
c8ba3e63cb feat(iot): 新增恒华D5客流摄像机编解码器,对接拌线人数统计(type=1)
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
走通用路由,新增 IotHenghuaD5Codec 解析 form-urlencoded 格式数据,
映射 InNum/OutNum 到 people_in/people_out,业务层完全复用现有客流阈值逻辑。
IotHttpUpstreamHandler 增加恒华D5 专用简洁响应。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 14:59:53 +08:00
lzh
04c61a41db fix(ops): 修复 CleanBadgeServiceImpl 调用不存在的 queryAreaNameById 方法导致编译失败
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
改用 OpsBusAreaDO.getAreaName() 获取区域名称

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 10:45:10 +08:00
7 changed files with 319 additions and 26 deletions

View File

@@ -0,0 +1,112 @@
package com.viewsh.module.iot.gateway.codec.henghua;
import cn.hutool.core.map.MapUtil;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec;
import com.viewsh.module.iot.gateway.codec.henghua.dto.HenghuaD5DataReqVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
/**
* 恒华 D5 客流摄像机 编解码器
*/
@Slf4j
@Component
public class IotHenghuaD5Codec implements IotDeviceMessageCodec {
public static final String TYPE = "HENGHUA_D5";
@Override
public String type() {
return TYPE;
}
@Override
public IotDeviceMessage decode(byte[] bytes) {
// 1. 将 byte[] 转为 UTF-8 字符串
String body = new String(bytes, StandardCharsets.UTF_8);
// 2. 按 & 拆分参数,构建 key-value Mapvalue 做 URL decode
Map<String, String> params = new HashMap<>();
for (String entry : body.split("&")) {
int idx = entry.indexOf('=');
if (idx > 0) {
String key = URLDecoder.decode(entry.substring(0, idx), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entry.substring(idx + 1), StandardCharsets.UTF_8);
params.put(key, value);
}
}
// 3. 提取 status等于 "0" 时为心跳,直接返回 null
String status = params.get("status");
if ("0".equals(status)) {
return null;
}
// 4. 提取 type当前只对接 type=1
String type = params.get("type");
if (!"1".equals(type)) {
return null;
}
// 5. 提取 data 参数(已在步骤 2 中完成 URL decode
String dataJson = params.get("data");
if (dataJson == null) {
log.warn("[decode][恒华D5 data 参数缺失]");
return null;
}
// 6. 将 JSON 字符串解析为 HenghuaD5DataReqVO
HenghuaD5DataReqVO reqVO;
try {
reqVO = JsonUtils.parseObject(dataJson, HenghuaD5DataReqVO.class);
} catch (Exception e) {
log.error("[decode][解析恒华D5 data JSON 失败]", e);
throw new IllegalArgumentException("data JSON 格式错误");
}
if (reqVO == null) {
log.warn("[decode][恒华D5 data 解析结果为空]");
return null;
}
// 7. 构建属性 Map
Map<String, Object> propertyParams = MapUtil.newHashMap();
propertyParams.put("people_in", reqVO.getInNum());
propertyParams.put("people_out", reqVO.getOutNum());
// 8. 时间校验timeStamp 超过 24 小时的数据舍弃
LocalDateTime reportTime = LocalDateTime.now();
if (reqVO.getTimeStamp() != null) {
reportTime = LocalDateTime.ofInstant(
Instant.ofEpochSecond(reqVO.getTimeStamp()), ZoneId.systemDefault());
long hoursDiff = Math.abs(Duration.between(reportTime, LocalDateTime.now()).toHours());
if (hoursDiff > 24) {
log.warn("[decode][设备时间超过24小时舍弃数据: timeStamp={}, 时间差: {}小时]",
reqVO.getTimeStamp(), hoursDiff);
return null;
}
}
// 9. 返回消息
return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), propertyParams)
.setReportTime(reportTime);
}
@Override
public byte[] encode(IotDeviceMessage message) {
// 恒华D5 不需要下行指令
return new byte[0];
}
}

View File

@@ -0,0 +1,41 @@
package com.viewsh.module.iot.gateway.codec.henghua.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* 恒华 D5 客流摄像机 — type=1 拌线统计数据上报请求
*/
@Data
public class HenghuaD5DataReqVO {
/**
* 时间日期字符串,格式 "YYYY-MM-DD HH:MM:SS"
*/
@JsonProperty("DataDateTime")
private String dataDateTime;
/**
* 进入人数(累计值)
*/
@JsonProperty("InNum")
private Integer inNum;
/**
* 离开人数(累计值)
*/
@JsonProperty("OutNum")
private Integer outNum;
/**
* 经过人数(累计值)
*/
@JsonProperty("PassNum")
private Integer passNum;
/**
* Unix 时间戳(秒)
*/
private Long timeStamp;
}

View File

@@ -8,6 +8,7 @@ import cn.hutool.extra.spring.SpringUtil;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.gateway.codec.henghua.IotHenghuaD5Codec;
import com.viewsh.module.iot.gateway.codec.people.IotPeopleCounterCodec;
import com.viewsh.module.iot.gateway.codec.people.dto.PeopleCounterUploadResp;
import com.viewsh.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
@@ -49,8 +50,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
// 2. 获取设备信息(提前获取,避免重复调用)
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
boolean isPeopleCounter = device != null
&& StrUtil.equals(device.getCodecType(), IotPeopleCounterCodec.TYPE);
String codecType = device != null ? device.getCodecType() : null;
boolean isPeopleCounter = StrUtil.equals(codecType, IotPeopleCounterCodec.TYPE);
boolean isHenghuaD5 = StrUtil.equals(codecType, IotHenghuaD5Codec.TYPE);
// 3. 解码设备消息
byte[] bytes = context.body().buffer().getBytes();
@@ -62,6 +64,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
if (isPeopleCounter) {
return writePeopleCounterResponse(context);
}
if (isHenghuaD5) {
return writeHenghuaD5Response(context);
}
return CommonResult.error(400, "消息解码失败");
}
@@ -69,7 +74,7 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
// 5. 发送消息到消息队列
// 注意HTTP 是短连接协议,响应直接通过 HTTP 返回,不需要记录回复消息
// 所以 serverId 传 null让系统不记录回复消<EFBFBD><EFBFBD>参见 IotDeviceMessageServiceImpl.handleUpstreamDeviceMessage 第186行
// 所以 serverId 传 null让系统不记录回复消
deviceMessageService.sendDeviceMessage(message,
productKey, deviceName, null);
@@ -77,6 +82,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
if (isPeopleCounter) {
return writePeopleCounterResponse(context);
}
if (isHenghuaD5) {
return writeHenghuaD5Response(context);
}
return CommonResult.success(MapUtil.of("messageId", message.getId()));
}
@@ -89,4 +97,12 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
return null; // 返回 null 表示响应已写入
}
/**
* 写入恒华D5客流摄像机响应
*/
private CommonResult<Object> writeHenghuaD5Response(RoutingContext context) {
writeResponse(context, MapUtil.<String, Object>builder("code", 0).put("msg", "success").build());
return null;
}
}

View File

@@ -12,6 +12,27 @@ import lombok.Data;
@Data
public class TrafficThresholdConfig {
/**
* 上报模式INCREMENTAL / CUMULATIVE
*/
public static final String REPORT_MODE_INCREMENTAL = "INCREMENTAL";
public static final String REPORT_MODE_CUMULATIVE = "CUMULATIVE";
/**
* 设备上报模式
* <p>
* INCREMENTAL默认设备上报增量值直接使用
* CUMULATIVE设备上报累计值需计算差值得到增量
*/
private String reportMode;
/**
* 是否累计值模式
*/
public boolean isCumulative() {
return REPORT_MODE_CUMULATIVE.equalsIgnoreCase(reportMode);
}
/**
* 触发阈值
* <p>

View File

@@ -244,6 +244,58 @@ public class TrafficCounterRedisDAO {
return null;
}
// ==================== 累计值设备上次值存取 ====================
/**
* 累计值设备上次上报值 Key 模式
* <p>
* 格式iot:clean:traffic:lastvalue:{deviceId}
* Hash Fieldpeople_in / people_out
*/
private static final String LAST_VALUE_KEY_PATTERN = "iot:clean:traffic:lastvalue:%s";
/**
* 累计值 TTL- 7 天,跨天不丢失,设备离线数天后仍能衔接
*/
private static final int LAST_VALUE_TTL_SECONDS = 604800;
/**
* 获取设备上次上报的累计值
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in / people_out
* @return 上次累计值,首次上报时返回 null
*/
public Long getLastCumulativeValue(Long deviceId, String identifier) {
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
Object value = stringRedisTemplate.opsForHash().get(key, identifier);
if (value == null) {
return null;
}
try {
return Long.parseLong(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
/**
* 更新设备上次上报的累计值
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in / people_out
* @param value 当前累计值
*/
public void setLastCumulativeValue(Long deviceId, String identifier, long value) {
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
stringRedisTemplate.opsForHash().put(key, identifier, String.valueOf(value));
// 仅在 key 无 TTL 时设置,避免高频上报场景下每次都执行 EXPIRE
Long ttl = stringRedisTemplate.getExpire(key, TimeUnit.SECONDS);
if (ttl == null || ttl == -1) {
stringRedisTemplate.expire(key, LAST_VALUE_TTL_SECONDS, TimeUnit.SECONDS);
}
}
// ==================== 私有方法 ====================
private static String formatThresholdKey(Long deviceId, Long areaId) {

View File

@@ -50,12 +50,13 @@ public class TrafficThresholdRuleProcessor {
* - people_in累加到当日统计 + 阈值计数器(需配置)
* - people_out累加到当日统计
* <p>
* 当日累积统计(用于报表)与工单触发(需配置)解耦
* 即使设备未配置工单触发规则,统计数据也会正常采集。
* 支持两种上报模式(通过 configData.trafficThreshold.reportMode 配置):
* - INCREMENTAL默认上报值直接作为增量
* - CUMULATIVE上报值为累计值自动计算差值得到增量
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in 或 people_out
* @param propertyValue 属性值(周期内增量)
* @param propertyValue 属性值(增量或累计值,取决于 reportMode
*/
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
// 1. 校验属性类型
@@ -66,15 +67,28 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 收到客流属性deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
// 2. 解析增量
Long increment = parseTrafficCount(propertyValue);
if (increment == null || increment <= 0) {
log.debug("[TrafficThreshold] 增量值无效deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
// 2. 解析原始
Long rawValue = parseTrafficCount(propertyValue);
if (rawValue == null || rawValue <= 0) {
return;
}
// 3. 无条件累加到当日统计(统计与工单触发解耦)
// 3. 获取配置,判断上报模式
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId);
TrafficThresholdConfig thresholdConfig = resolveThresholdConfig(configWrapper);
// 4. 根据上报模式计算增量
long increment;
if (thresholdConfig != null && thresholdConfig.isCumulative()) {
increment = resolveIncrement(deviceId, identifier, rawValue);
} else {
increment = rawValue;
}
if (increment <= 0) {
return;
}
// 5. 累加到当日统计(统计与工单触发解耦)
LocalDate today = LocalDate.now();
if ("people_in".equals(identifier)) {
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
@@ -84,21 +98,11 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 当日统计累加deviceId={}, identifier={}, increment={}",
deviceId, identifier, increment);
// 4. 以下为工单触发逻辑,需要设备配置支持
// 6. 以下为工单触发逻辑,仅 people_in 参与
if (!"people_in".equals(identifier)) {
return; // people_out 不参与阈值判定
}
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
getConfigWrapper(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null) {
log.debug("[TrafficThreshold] 设备无工单触发配置deviceId={}", deviceId);
return;
}
TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold();
if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) {
log.debug("[TrafficThreshold] 未启用客流阈值触发deviceId={}", deviceId);
if (thresholdConfig == null || !Boolean.TRUE.equals(thresholdConfig.getAutoCreateOrder())) {
return;
}
@@ -106,10 +110,57 @@ public class TrafficThresholdRuleProcessor {
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
}
/**
* 从配置包装器中提取客流阈值配置
*
* @return 阈值配置,无配置时返回 null
*/
private TrafficThresholdConfig resolveThresholdConfig(
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
if (configWrapper == null || configWrapper.getConfig() == null) {
return null;
}
return configWrapper.getConfig().getTrafficThreshold();
}
/**
* 累计值转增量
* <p>
* 通过 Redis 存储上次上报的累计值,计算差值得到本次增量。
* 处理三种场景:首次上报、正常递增、设备重启归零。
*
* @param deviceId 设备ID
* @param identifier 属性标识符
* @param currentValue 本次上报的累计值
* @return 增量值;首次上报返回 0
*/
private long resolveIncrement(Long deviceId, String identifier, long currentValue) {
Long lastValue = trafficCounterRedisDAO.getLastCumulativeValue(deviceId, identifier);
// 无论是否能算出增量,都记录当前值
trafficCounterRedisDAO.setLastCumulativeValue(deviceId, identifier, currentValue);
if (lastValue == null) {
// 首次上报:无历史基准,不计入统计
log.info("[TrafficThreshold] 累计值设备首次上报建立基准deviceId={}, identifier={}, value={}",
deviceId, identifier, currentValue);
return 0;
}
if (currentValue >= lastValue) {
return currentValue - lastValue;
}
// currentValue < lastValue → 设备重启归零
log.info("[TrafficThreshold] 检测到设备重启deviceId={}, identifier={}, last={}, current={}",
deviceId, identifier, lastValue, currentValue);
return currentValue;
}
/**
* 处理 people_in 增量
*/
private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today,
private void handlePeopleIn(Long deviceId, Long areaId, long increment, LocalDate today,
TrafficThresholdConfig thresholdConfig,
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
// 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)

View File

@@ -204,7 +204,7 @@ public class CleanBadgeServiceImpl implements CleanBadgeService {
}
try {
OpsBusAreaDO area = opsBusAreaMapper.selectById(areaId);
return area != null ? area.queryAreaNameById() : null;
return area != null ? area.getAreaName() : null;
} catch (Exception e) {
log.warn("[getBadgeRealtimeStatus] 查询区域名称失败: areaId={}", areaId, e);
return null;