diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-biz/src/main/java/com/viewshanghai/module/iot/service/device/message/IotDeviceMessageServiceImpl.java b/viewshanghai-module-iot/viewshanghai-module-iot-biz/src/main/java/com/viewshanghai/module/iot/service/device/message/IotDeviceMessageServiceImpl.java index a2494f1..ea7bd86 100644 --- a/viewshanghai-module-iot/viewshanghai-module-iot-biz/src/main/java/com/viewshanghai/module/iot/service/device/message/IotDeviceMessageServiceImpl.java +++ b/viewshanghai-module-iot/viewshanghai-module-iot-biz/src/main/java/com/viewshanghai/module/iot/service/device/message/IotDeviceMessageServiceImpl.java @@ -169,10 +169,13 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { getSelf().createDeviceLogAsync(message); // 3. 回复消息。前提:非 _reply 消息,并且非禁用回复的消息 + // 条件1:防止对"回复消息"再次回复,避免无限循环 + // 条件2:某些特定的method不需要回复(如设备状态变更、OTA进度上报) + // 条件3(新增):HTTP短连接场景,因为已经在请求中直接响应了,不需要再通过消息总线发送回复 if (IotDeviceMessageUtils.isReplyMessage(message) || IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod()) || StrUtil.isEmpty(message.getServerId())) { - return; + return; // serverId 为空,不记录回复消息 } try { IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), replyData, diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java index 7ce0d70..025d36e 100644 --- a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/http/router/IotHttpUpstreamHandler.java @@ -3,10 +3,15 @@ package com.viewshanghai.module.iot.gateway.protocol.http.router; import cn.hutool.core.lang.Assert; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.StrPool; +import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import com.viewshanghai.framework.common.pojo.CommonResult; +import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO; import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage; +import com.viewshanghai.module.iot.gateway.codec.people.IotPeopleCounterCodec; +import com.viewshanghai.module.iot.gateway.codec.people.dto.PeopleCounterUploadResp; import com.viewshanghai.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; +import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService; import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.ext.web.RoutingContext; import lombok.RequiredArgsConstructor; @@ -27,9 +32,12 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler { private final IotDeviceMessageService deviceMessageService; + private final IotDeviceService deviceService; + public IotHttpUpstreamHandler(IotHttpUpstreamProtocol protocol) { this.protocol = protocol; this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + this.deviceService = SpringUtil.getBean(IotDeviceService.class); } @Override @@ -39,16 +47,29 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler { String deviceName = context.pathParam("deviceName"); String method = context.pathParam("*").replaceAll(StrPool.SLASH, StrPool.DOT); - // 2.1 解析消息 + // 2. 解码设备消息 byte[] bytes = context.body().buffer().getBytes(); IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes, productKey, deviceName); Assert.equals(method, message.getMethod(), "method 不匹配"); - // 2.2 发送消息 - deviceMessageService.sendDeviceMessage(message, - productKey, deviceName, protocol.getServerId()); - // 3. 返回结果 + // 3. 发送消息到消息队列 + // 注意:HTTP 是短连接协议,响应直接通过 HTTP 返回,不需要记录回复消息 + // 所以 serverId 传 null,让系统不记录回复消息(参见 IotDeviceMessageServiceImpl.handleUpstreamDeviceMessage 第186行) + deviceMessageService.sendDeviceMessage(message, + productKey, deviceName, null); + + // 4. 构建响应 + // 4.1 特殊处理:客流计数器设备返回自定义格式 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName); + if (device != null && StrUtil.equals(device.getCodecType(), IotPeopleCounterCodec.TYPE)) { + // 客流计数器:返回自定义响应格式 + PeopleCounterUploadResp resp = PeopleCounterUploadResp.createDefault(); + writeResponse(context, resp); + return null; // 返回 null 表示响应已写入 + } + + // 4.2 默认:返回标准 CommonResult 格式 return CommonResult.success(MapUtil.of("messageId", message.getId())); }