feat(iot): 新增恒华D5客流摄像机编解码器,对接拌线人数统计(type=1)
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled

走通用路由,新增 IotHenghuaD5Codec 解析 form-urlencoded 格式数据,
映射 InNum/OutNum 到 people_in/people_out,业务层完全复用现有客流阈值逻辑。
IotHttpUpstreamHandler 增加恒华D5 专用简洁响应。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-07 14:59:53 +08:00
parent 04c61a41db
commit c8ba3e63cb
3 changed files with 172 additions and 3 deletions

View File

@@ -0,0 +1,112 @@
package com.viewsh.module.iot.gateway.codec.henghua;
import cn.hutool.core.map.MapUtil;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec;
import com.viewsh.module.iot.gateway.codec.henghua.dto.HenghuaD5DataReqVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
/**
* 恒华 D5 客流摄像机 编解码器
*/
@Slf4j
@Component
public class IotHenghuaD5Codec implements IotDeviceMessageCodec {
public static final String TYPE = "HENGHUA_D5";
@Override
public String type() {
return TYPE;
}
@Override
public IotDeviceMessage decode(byte[] bytes) {
// 1. 将 byte[] 转为 UTF-8 字符串
String body = new String(bytes, StandardCharsets.UTF_8);
// 2. 按 & 拆分参数,构建 key-value Mapvalue 做 URL decode
Map<String, String> params = new HashMap<>();
for (String entry : body.split("&")) {
int idx = entry.indexOf('=');
if (idx > 0) {
String key = URLDecoder.decode(entry.substring(0, idx), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entry.substring(idx + 1), StandardCharsets.UTF_8);
params.put(key, value);
}
}
// 3. 提取 status等于 "0" 时为心跳,直接返回 null
String status = params.get("status");
if ("0".equals(status)) {
return null;
}
// 4. 提取 type当前只对接 type=1
String type = params.get("type");
if (!"1".equals(type)) {
return null;
}
// 5. 提取 data 参数(已在步骤 2 中完成 URL decode
String dataJson = params.get("data");
if (dataJson == null) {
log.warn("[decode][恒华D5 data 参数缺失]");
return null;
}
// 6. 将 JSON 字符串解析为 HenghuaD5DataReqVO
HenghuaD5DataReqVO reqVO;
try {
reqVO = JsonUtils.parseObject(dataJson, HenghuaD5DataReqVO.class);
} catch (Exception e) {
log.error("[decode][解析恒华D5 data JSON 失败]", e);
throw new IllegalArgumentException("data JSON 格式错误");
}
if (reqVO == null) {
log.warn("[decode][恒华D5 data 解析结果为空]");
return null;
}
// 7. 构建属性 Map
Map<String, Object> propertyParams = MapUtil.newHashMap();
propertyParams.put("people_in", reqVO.getInNum());
propertyParams.put("people_out", reqVO.getOutNum());
// 8. 时间校验timeStamp 超过 24 小时的数据舍弃
LocalDateTime reportTime = LocalDateTime.now();
if (reqVO.getTimeStamp() != null) {
reportTime = LocalDateTime.ofInstant(
Instant.ofEpochSecond(reqVO.getTimeStamp()), ZoneId.systemDefault());
long hoursDiff = Math.abs(Duration.between(reportTime, LocalDateTime.now()).toHours());
if (hoursDiff > 24) {
log.warn("[decode][设备时间超过24小时舍弃数据: timeStamp={}, 时间差: {}小时]",
reqVO.getTimeStamp(), hoursDiff);
return null;
}
}
// 9. 返回消息
return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), propertyParams)
.setReportTime(reportTime);
}
@Override
public byte[] encode(IotDeviceMessage message) {
// 恒华D5 不需要下行指令
return new byte[0];
}
}

View File

@@ -0,0 +1,41 @@
package com.viewsh.module.iot.gateway.codec.henghua.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* 恒华 D5 客流摄像机 — type=1 拌线统计数据上报请求
*/
@Data
public class HenghuaD5DataReqVO {
/**
* 时间日期字符串,格式 "YYYY-MM-DD HH:MM:SS"
*/
@JsonProperty("DataDateTime")
private String dataDateTime;
/**
* 进入人数(累计值)
*/
@JsonProperty("InNum")
private Integer inNum;
/**
* 离开人数(累计值)
*/
@JsonProperty("OutNum")
private Integer outNum;
/**
* 经过人数(累计值)
*/
@JsonProperty("PassNum")
private Integer passNum;
/**
* Unix 时间戳(秒)
*/
private Long timeStamp;
}

View File

@@ -8,6 +8,7 @@ import cn.hutool.extra.spring.SpringUtil;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.gateway.codec.henghua.IotHenghuaD5Codec;
import com.viewsh.module.iot.gateway.codec.people.IotPeopleCounterCodec;
import com.viewsh.module.iot.gateway.codec.people.dto.PeopleCounterUploadResp;
import com.viewsh.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
@@ -49,8 +50,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
// 2. 获取设备信息(提前获取,避免重复调用)
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
boolean isPeopleCounter = device != null
&& StrUtil.equals(device.getCodecType(), IotPeopleCounterCodec.TYPE);
String codecType = device != null ? device.getCodecType() : null;
boolean isPeopleCounter = StrUtil.equals(codecType, IotPeopleCounterCodec.TYPE);
boolean isHenghuaD5 = StrUtil.equals(codecType, IotHenghuaD5Codec.TYPE);
// 3. 解码设备消息
byte[] bytes = context.body().buffer().getBytes();
@@ -62,6 +64,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
if (isPeopleCounter) {
return writePeopleCounterResponse(context);
}
if (isHenghuaD5) {
return writeHenghuaD5Response(context);
}
return CommonResult.error(400, "消息解码失败");
}
@@ -69,7 +74,7 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
// 5. 发送消息到消息队列
// 注意HTTP 是短连接协议,响应直接通过 HTTP 返回,不需要记录回复消息
// 所以 serverId 传 null让系统不记录回复消<EFBFBD><EFBFBD>参见 IotDeviceMessageServiceImpl.handleUpstreamDeviceMessage 第186行
// 所以 serverId 传 null让系统不记录回复消
deviceMessageService.sendDeviceMessage(message,
productKey, deviceName, null);
@@ -77,6 +82,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
if (isPeopleCounter) {
return writePeopleCounterResponse(context);
}
if (isHenghuaD5) {
return writeHenghuaD5Response(context);
}
return CommonResult.success(MapUtil.of("messageId", message.getId()));
}
@@ -89,4 +97,12 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
return null; // 返回 null 表示响应已写入
}
/**
* 写入恒华D5客流摄像机响应
*/
private CommonResult<Object> writeHenghuaD5Response(RoutingContext context) {
writeResponse(context, MapUtil.<String, Object>builder("code", 0).put("msg", "success").build());
return null;
}
}