Compare commits
3 Commits
feature/vi
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 1ca472ea93 | |||
| c8ba3e63cb | |||
| 04c61a41db |
@@ -0,0 +1,112 @@
|
||||
package com.viewsh.module.iot.gateway.codec.henghua;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.viewsh.framework.common.util.json.JsonUtils;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.codec.henghua.dto.HenghuaD5DataReqVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.net.URLDecoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 恒华 D5 客流摄像机 编解码器
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class IotHenghuaD5Codec implements IotDeviceMessageCodec {
|
||||
|
||||
public static final String TYPE = "HENGHUA_D5";
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotDeviceMessage decode(byte[] bytes) {
|
||||
// 1. 将 byte[] 转为 UTF-8 字符串
|
||||
String body = new String(bytes, StandardCharsets.UTF_8);
|
||||
|
||||
// 2. 按 & 拆分参数,构建 key-value Map(value 做 URL decode)
|
||||
Map<String, String> params = new HashMap<>();
|
||||
for (String entry : body.split("&")) {
|
||||
int idx = entry.indexOf('=');
|
||||
if (idx > 0) {
|
||||
String key = URLDecoder.decode(entry.substring(0, idx), StandardCharsets.UTF_8);
|
||||
String value = URLDecoder.decode(entry.substring(idx + 1), StandardCharsets.UTF_8);
|
||||
params.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 提取 status:等于 "0" 时为心跳,直接返回 null
|
||||
String status = params.get("status");
|
||||
if ("0".equals(status)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 4. 提取 type:当前只对接 type=1
|
||||
String type = params.get("type");
|
||||
if (!"1".equals(type)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 5. 提取 data 参数(已在步骤 2 中完成 URL decode)
|
||||
String dataJson = params.get("data");
|
||||
if (dataJson == null) {
|
||||
log.warn("[decode][恒华D5 data 参数缺失]");
|
||||
return null;
|
||||
}
|
||||
|
||||
// 6. 将 JSON 字符串解析为 HenghuaD5DataReqVO
|
||||
HenghuaD5DataReqVO reqVO;
|
||||
try {
|
||||
reqVO = JsonUtils.parseObject(dataJson, HenghuaD5DataReqVO.class);
|
||||
} catch (Exception e) {
|
||||
log.error("[decode][解析恒华D5 data JSON 失败]", e);
|
||||
throw new IllegalArgumentException("data JSON 格式错误");
|
||||
}
|
||||
if (reqVO == null) {
|
||||
log.warn("[decode][恒华D5 data 解析结果为空]");
|
||||
return null;
|
||||
}
|
||||
|
||||
// 7. 构建属性 Map
|
||||
Map<String, Object> propertyParams = MapUtil.newHashMap();
|
||||
propertyParams.put("people_in", reqVO.getInNum());
|
||||
propertyParams.put("people_out", reqVO.getOutNum());
|
||||
|
||||
// 8. 时间校验:timeStamp 超过 24 小时的数据舍弃
|
||||
LocalDateTime reportTime = LocalDateTime.now();
|
||||
if (reqVO.getTimeStamp() != null) {
|
||||
reportTime = LocalDateTime.ofInstant(
|
||||
Instant.ofEpochSecond(reqVO.getTimeStamp()), ZoneId.systemDefault());
|
||||
long hoursDiff = Math.abs(Duration.between(reportTime, LocalDateTime.now()).toHours());
|
||||
if (hoursDiff > 24) {
|
||||
log.warn("[decode][设备时间超过24小时,舍弃数据: timeStamp={}, 时间差: {}小时]",
|
||||
reqVO.getTimeStamp(), hoursDiff);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// 9. 返回消息
|
||||
return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), propertyParams)
|
||||
.setReportTime(reportTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] encode(IotDeviceMessage message) {
|
||||
// 恒华D5 不需要下行指令
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.viewsh.module.iot.gateway.codec.henghua.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 恒华 D5 客流摄像机 — type=1 拌线统计数据上报请求
|
||||
*/
|
||||
@Data
|
||||
public class HenghuaD5DataReqVO {
|
||||
|
||||
/**
|
||||
* 时间日期字符串,格式 "YYYY-MM-DD HH:MM:SS"
|
||||
*/
|
||||
@JsonProperty("DataDateTime")
|
||||
private String dataDateTime;
|
||||
|
||||
/**
|
||||
* 进入人数(累计值)
|
||||
*/
|
||||
@JsonProperty("InNum")
|
||||
private Integer inNum;
|
||||
|
||||
/**
|
||||
* 离开人数(累计值)
|
||||
*/
|
||||
@JsonProperty("OutNum")
|
||||
private Integer outNum;
|
||||
|
||||
/**
|
||||
* 经过人数(累计值)
|
||||
*/
|
||||
@JsonProperty("PassNum")
|
||||
private Integer passNum;
|
||||
|
||||
/**
|
||||
* Unix 时间戳(秒)
|
||||
*/
|
||||
private Long timeStamp;
|
||||
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.gateway.codec.henghua.IotHenghuaD5Codec;
|
||||
import com.viewsh.module.iot.gateway.codec.people.IotPeopleCounterCodec;
|
||||
import com.viewsh.module.iot.gateway.codec.people.dto.PeopleCounterUploadResp;
|
||||
import com.viewsh.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
|
||||
@@ -49,8 +50,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
|
||||
|
||||
// 2. 获取设备信息(提前获取,避免重复调用)
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
|
||||
boolean isPeopleCounter = device != null
|
||||
&& StrUtil.equals(device.getCodecType(), IotPeopleCounterCodec.TYPE);
|
||||
String codecType = device != null ? device.getCodecType() : null;
|
||||
boolean isPeopleCounter = StrUtil.equals(codecType, IotPeopleCounterCodec.TYPE);
|
||||
boolean isHenghuaD5 = StrUtil.equals(codecType, IotHenghuaD5Codec.TYPE);
|
||||
|
||||
// 3. 解码设备消息
|
||||
byte[] bytes = context.body().buffer().getBytes();
|
||||
@@ -62,6 +64,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
|
||||
if (isPeopleCounter) {
|
||||
return writePeopleCounterResponse(context);
|
||||
}
|
||||
if (isHenghuaD5) {
|
||||
return writeHenghuaD5Response(context);
|
||||
}
|
||||
return CommonResult.error(400, "消息解码失败");
|
||||
}
|
||||
|
||||
@@ -69,7 +74,7 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
|
||||
|
||||
// 5. 发送消息到消息队列
|
||||
// 注意:HTTP 是短连接协议,响应直接通过 HTTP 返回,不需要记录回复消息
|
||||
// 所以 serverId 传 null,让系统不记录回复消<EFBFBD><EFBFBD>(参见 IotDeviceMessageServiceImpl.handleUpstreamDeviceMessage 第186行)
|
||||
// 所以 serverId 传 null,让系统不记录回复消息
|
||||
deviceMessageService.sendDeviceMessage(message,
|
||||
productKey, deviceName, null);
|
||||
|
||||
@@ -77,6 +82,9 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
|
||||
if (isPeopleCounter) {
|
||||
return writePeopleCounterResponse(context);
|
||||
}
|
||||
if (isHenghuaD5) {
|
||||
return writeHenghuaD5Response(context);
|
||||
}
|
||||
return CommonResult.success(MapUtil.of("messageId", message.getId()));
|
||||
}
|
||||
|
||||
@@ -89,4 +97,12 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
|
||||
return null; // 返回 null 表示响应已写入
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入恒华D5客流摄像机响应
|
||||
*/
|
||||
private CommonResult<Object> writeHenghuaD5Response(RoutingContext context) {
|
||||
writeResponse(context, MapUtil.<String, Object>builder("code", 0).put("msg", "success").build());
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -12,6 +12,27 @@ import lombok.Data;
|
||||
@Data
|
||||
public class TrafficThresholdConfig {
|
||||
|
||||
/**
|
||||
* 上报模式:INCREMENTAL / CUMULATIVE
|
||||
*/
|
||||
public static final String REPORT_MODE_INCREMENTAL = "INCREMENTAL";
|
||||
public static final String REPORT_MODE_CUMULATIVE = "CUMULATIVE";
|
||||
|
||||
/**
|
||||
* 设备上报模式
|
||||
* <p>
|
||||
* INCREMENTAL(默认):设备上报增量值,直接使用
|
||||
* CUMULATIVE:设备上报累计值,需计算差值得到增量
|
||||
*/
|
||||
private String reportMode;
|
||||
|
||||
/**
|
||||
* 是否累计值模式
|
||||
*/
|
||||
public boolean isCumulative() {
|
||||
return REPORT_MODE_CUMULATIVE.equalsIgnoreCase(reportMode);
|
||||
}
|
||||
|
||||
/**
|
||||
* 触发阈值
|
||||
* <p>
|
||||
|
||||
@@ -244,6 +244,58 @@ public class TrafficCounterRedisDAO {
|
||||
return null;
|
||||
}
|
||||
|
||||
// ==================== 累计值设备上次值存取 ====================
|
||||
|
||||
/**
|
||||
* 累计值设备上次上报值 Key 模式
|
||||
* <p>
|
||||
* 格式:iot:clean:traffic:lastvalue:{deviceId}
|
||||
* Hash Field:people_in / people_out
|
||||
*/
|
||||
private static final String LAST_VALUE_KEY_PATTERN = "iot:clean:traffic:lastvalue:%s";
|
||||
|
||||
/**
|
||||
* 累计值 TTL(秒)- 7 天,跨天不丢失,设备离线数天后仍能衔接
|
||||
*/
|
||||
private static final int LAST_VALUE_TTL_SECONDS = 604800;
|
||||
|
||||
/**
|
||||
* 获取设备上次上报的累计值
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(people_in / people_out)
|
||||
* @return 上次累计值,首次上报时返回 null
|
||||
*/
|
||||
public Long getLastCumulativeValue(Long deviceId, String identifier) {
|
||||
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
|
||||
Object value = stringRedisTemplate.opsForHash().get(key, identifier);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return Long.parseLong(value.toString());
|
||||
} catch (NumberFormatException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新设备上次上报的累计值
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(people_in / people_out)
|
||||
* @param value 当前累计值
|
||||
*/
|
||||
public void setLastCumulativeValue(Long deviceId, String identifier, long value) {
|
||||
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
|
||||
stringRedisTemplate.opsForHash().put(key, identifier, String.valueOf(value));
|
||||
// 仅在 key 无 TTL 时设置,避免高频上报场景下每次都执行 EXPIRE
|
||||
Long ttl = stringRedisTemplate.getExpire(key, TimeUnit.SECONDS);
|
||||
if (ttl == null || ttl == -1) {
|
||||
stringRedisTemplate.expire(key, LAST_VALUE_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 私有方法 ====================
|
||||
|
||||
private static String formatThresholdKey(Long deviceId, Long areaId) {
|
||||
|
||||
@@ -50,12 +50,13 @@ public class TrafficThresholdRuleProcessor {
|
||||
* - people_in:累加到当日统计 + 阈值计数器(需配置)
|
||||
* - people_out:累加到当日统计
|
||||
* <p>
|
||||
* 当日累积统计(用于报表)与工单触发(需配置)解耦:
|
||||
* 即使设备未配置工单触发规则,统计数据也会正常采集。
|
||||
* 支持两种上报模式(通过 configData.trafficThreshold.reportMode 配置):
|
||||
* - INCREMENTAL(默认):上报值直接作为增量
|
||||
* - CUMULATIVE:上报值为累计值,自动计算差值得到增量
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(people_in 或 people_out)
|
||||
* @param propertyValue 属性值(周期内增量)
|
||||
* @param propertyValue 属性值(增量或累计值,取决于 reportMode)
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
// 1. 校验属性类型
|
||||
@@ -66,15 +67,28 @@ public class TrafficThresholdRuleProcessor {
|
||||
log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, identifier={}, value={}",
|
||||
deviceId, identifier, propertyValue);
|
||||
|
||||
// 2. 解析增量值
|
||||
Long increment = parseTrafficCount(propertyValue);
|
||||
if (increment == null || increment <= 0) {
|
||||
log.debug("[TrafficThreshold] 增量值无效:deviceId={}, identifier={}, value={}",
|
||||
deviceId, identifier, propertyValue);
|
||||
// 2. 解析原始值
|
||||
Long rawValue = parseTrafficCount(propertyValue);
|
||||
if (rawValue == null || rawValue <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 无条件累加到当日统计(统计与工单触发解耦)
|
||||
// 3. 获取配置,判断上报模式
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId);
|
||||
TrafficThresholdConfig thresholdConfig = resolveThresholdConfig(configWrapper);
|
||||
|
||||
// 4. 根据上报模式计算增量
|
||||
long increment;
|
||||
if (thresholdConfig != null && thresholdConfig.isCumulative()) {
|
||||
increment = resolveIncrement(deviceId, identifier, rawValue);
|
||||
} else {
|
||||
increment = rawValue;
|
||||
}
|
||||
if (increment <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 累加到当日统计(统计与工单触发解耦)
|
||||
LocalDate today = LocalDate.now();
|
||||
if ("people_in".equals(identifier)) {
|
||||
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
|
||||
@@ -84,21 +98,11 @@ public class TrafficThresholdRuleProcessor {
|
||||
log.debug("[TrafficThreshold] 当日统计累加:deviceId={}, identifier={}, increment={}",
|
||||
deviceId, identifier, increment);
|
||||
|
||||
// 4. 以下为工单触发逻辑,需要设备配置支持
|
||||
// 6. 以下为工单触发逻辑,仅 people_in 参与
|
||||
if (!"people_in".equals(identifier)) {
|
||||
return; // people_out 不参与阈值判定
|
||||
}
|
||||
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
|
||||
getConfigWrapper(deviceId);
|
||||
if (configWrapper == null || configWrapper.getConfig() == null) {
|
||||
log.debug("[TrafficThreshold] 设备无工单触发配置:deviceId={}", deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold();
|
||||
if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) {
|
||||
log.debug("[TrafficThreshold] 未启用客流阈值触发:deviceId={}", deviceId);
|
||||
if (thresholdConfig == null || !Boolean.TRUE.equals(thresholdConfig.getAutoCreateOrder())) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -106,10 +110,57 @@ public class TrafficThresholdRuleProcessor {
|
||||
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从配置包装器中提取客流阈值配置
|
||||
*
|
||||
* @return 阈值配置,无配置时返回 null
|
||||
*/
|
||||
private TrafficThresholdConfig resolveThresholdConfig(
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
|
||||
if (configWrapper == null || configWrapper.getConfig() == null) {
|
||||
return null;
|
||||
}
|
||||
return configWrapper.getConfig().getTrafficThreshold();
|
||||
}
|
||||
|
||||
/**
|
||||
* 累计值转增量
|
||||
* <p>
|
||||
* 通过 Redis 存储上次上报的累计值,计算差值得到本次增量。
|
||||
* 处理三种场景:首次上报、正常递增、设备重启归零。
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符
|
||||
* @param currentValue 本次上报的累计值
|
||||
* @return 增量值;首次上报返回 0
|
||||
*/
|
||||
private long resolveIncrement(Long deviceId, String identifier, long currentValue) {
|
||||
Long lastValue = trafficCounterRedisDAO.getLastCumulativeValue(deviceId, identifier);
|
||||
|
||||
// 无论是否能算出增量,都记录当前值
|
||||
trafficCounterRedisDAO.setLastCumulativeValue(deviceId, identifier, currentValue);
|
||||
|
||||
if (lastValue == null) {
|
||||
// 首次上报:无历史基准,不计入统计
|
||||
log.info("[TrafficThreshold] 累计值设备首次上报,建立基准:deviceId={}, identifier={}, value={}",
|
||||
deviceId, identifier, currentValue);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (currentValue >= lastValue) {
|
||||
return currentValue - lastValue;
|
||||
}
|
||||
|
||||
// currentValue < lastValue → 设备重启归零
|
||||
log.info("[TrafficThreshold] 检测到设备重启:deviceId={}, identifier={}, last={}, current={}",
|
||||
deviceId, identifier, lastValue, currentValue);
|
||||
return currentValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 people_in 增量
|
||||
*/
|
||||
private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today,
|
||||
private void handlePeopleIn(Long deviceId, Long areaId, long increment, LocalDate today,
|
||||
TrafficThresholdConfig thresholdConfig,
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
|
||||
// 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)
|
||||
|
||||
@@ -204,7 +204,7 @@ public class CleanBadgeServiceImpl implements CleanBadgeService {
|
||||
}
|
||||
try {
|
||||
OpsBusAreaDO area = opsBusAreaMapper.selectById(areaId);
|
||||
return area != null ? area.queryAreaNameById() : null;
|
||||
return area != null ? area.getAreaName() : null;
|
||||
} catch (Exception e) {
|
||||
log.warn("[getBadgeRealtimeStatus] 查询区域名称失败: areaId={}", areaId, e);
|
||||
return null;
|
||||
|
||||
Reference in New Issue
Block a user