diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/people/IotPeopleCounterCodec.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/people/IotPeopleCounterCodec.java index 82ff117..739568b 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/people/IotPeopleCounterCodec.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/people/IotPeopleCounterCodec.java @@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.time.LocalDateTime; import java.util.Comparator; import java.util.List; @@ -59,6 +60,7 @@ public class IotPeopleCounterCodec implements IotDeviceMessageCodec { // 3. 构建属性参数 Map (params) Map params = MapUtil.newHashMap(); // 3.1 基础属性 + params.put("uuid", payload.getUuid()); params.put("rssi", payload.getRssi()); params.put("model", payload.getModel()); params.put("version", payload.getVersion()); @@ -77,20 +79,26 @@ public class IotPeopleCounterCodec implements IotDeviceMessageCodec { params.put("battery_rx", latestData.getRxBat()); params.put("battery_tx", latestData.getTxBat()); - // 解析上报时间(改为使用本地当前时间) -// if (StrUtil.isNotBlank(latestData.getTime())) { -// try { -// // 格式:20250501093000 -> yyyyMMddHHmmss -// reportTime = LocalDateTimeUtil.parse(latestData.getTime(), DatePattern.PURE_DATETIME_PATTERN); -// } catch (Exception e) { -// log.warn("[decode][时间格式解析失败: {}]", latestData.getTime()); -// } -// } + // 解析上报时间,格式:YYYYMMDDHHmmSS + if (StrUtil.isNotBlank(latestData.getTime())) { + try { + LocalDateTime deviceTime = LocalDateTimeUtil.parse(latestData.getTime(), DatePattern.PURE_DATETIME_PATTERN); + // 校验:如果时间超过当前时间24小时以上,舍弃该数据 + long hoursDiff = Math.abs(Duration.between(deviceTime, LocalDateTime.now()).toHours()); + if (hoursDiff > 24) { + log.warn("[decode][设备时间超过24小时,舍弃数据: {},当前时间差: {}小时]", latestData.getTime(), hoursDiff); + return null; + } + reportTime = deviceTime; + } catch (Exception e) { + log.warn("[decode][时间格式解析失败: {}]", latestData.getTime()); + } + } } // 4. 构建 IotDeviceMessage return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params) - .setReportTime(LocalDateTime.now()); + .setReportTime(reportTime); } @Override diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java index 364a746..6d4a3b1 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java @@ -47,30 +47,46 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler { String deviceName = context.pathParam("deviceName"); String method = context.pathParam("*").replaceAll(StrPool.SLASH, StrPool.DOT); - // 2. 解码设备消息 + // 2. 获取设备信息(提前获取,避免重复调用) + IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName); + boolean isPeopleCounter = device != null + && StrUtil.equals(device.getCodecType(), IotPeopleCounterCodec.TYPE); + + // 3. 解码设备消息 byte[] bytes = context.body().buffer().getBytes(); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes, productKey, deviceName); + + // 4. 解码失败处理 + if (message == null) { + if (isPeopleCounter) { + return writePeopleCounterResponse(context); + } + return CommonResult.error(400, "消息解码失败"); + } + Assert.equals(method, message.getMethod(), "method 不匹配"); - // 3. 发送消息到消息队列 + // 5. 发送消息到消息队列 // 注意:HTTP 是短连接协议,响应直接通过 HTTP 返回,不需要记录回复消息 - // 所以 serverId 传 null,让系统不记录回复消息(参见 IotDeviceMessageServiceImpl.handleUpstreamDeviceMessage 第186行) + // 所以 serverId 传 null,让系统不记录回复消��(参见 IotDeviceMessageServiceImpl.handleUpstreamDeviceMessage 第186行) deviceMessageService.sendDeviceMessage(message, productKey, deviceName, null); - // 4. 构建响应 - // 4.1 特殊处理:客流计数器设备返回自定义格式 - IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName); - if (device != null && StrUtil.equals(device.getCodecType(), IotPeopleCounterCodec.TYPE)) { - // 客流计数器:返回自定义响应格式 - PeopleCounterUploadResp resp = PeopleCounterUploadResp.createDefault(); - writeResponse(context, resp); - return null; // 返回 null 表示响应已写入 + // 6. 构建响应 + if (isPeopleCounter) { + return writePeopleCounterResponse(context); } - - // 4.2 默认:返回标准 CommonResult 格式 return CommonResult.success(MapUtil.of("messageId", message.getId())); } -} \ No newline at end of file + /** + * 写入客流计数器自定义响应 + */ + private CommonResult writePeopleCounterResponse(RoutingContext context) { + PeopleCounterUploadResp resp = PeopleCounterUploadResp.createDefault(); + writeResponse(context, resp); + return null; // 返回 null 表示响应已写入 + } + +}