From cc6b11f4e99bbcfbe8b17018af35f3366e9978df Mon Sep 17 00:00:00 2001 From: lzh Date: Sun, 8 Feb 2026 00:23:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20=E5=AF=B9=E6=8E=A5=203D11=20?= =?UTF-8?q?=E5=8D=95=E7=9B=AE=E5=AE=A2=E6=B5=81=E8=AE=A1=E6=95=B0=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 IoT Gateway 的 Vert.x Router 上注册 /api/camera/* 专用路由, 桥接 3D11 摄像头的心跳和数据上报到现有消息总线和编解码体系。 - 新建 Camera3D11 DTO(心跳请求、数据上报请求、统一响应) - 新建 IotCamera3D11Codec 编解码器(TYPE=CAMERA_3D11) - 新建 IotCameraUpstreamHandler 处理心跳和数据上报 - productKey 通过 application.yaml 配置,未配置时不注册路由 - 心跳上报间隔设为 1 分钟 Co-Authored-By: Claude Opus 4.6 --- .../codec/camera3d11/IotCamera3D11Codec.java | 68 ++ .../dto/Camera3D11DataUploadReqVO.java | 44 + .../dto/Camera3D11HeartBeatReqVO.java | 21 + .../codec/camera3d11/dto/Camera3D11Resp.java | 36 + .../gateway/config/IotGatewayProperties.java | 817 +++++++++--------- .../http/IotHttpUpstreamProtocol.java | 181 ++-- .../http/router/IotCameraUpstreamHandler.java | 150 ++++ .../src/main/resources/application.yaml | 1 + 8 files changed, 827 insertions(+), 491 deletions(-) create mode 100644 viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/IotCamera3D11Codec.java create mode 100644 viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11DataUploadReqVO.java create mode 100644 viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11HeartBeatReqVO.java create mode 100644 viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11Resp.java create mode 100644 viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotCameraUpstreamHandler.java diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/IotCamera3D11Codec.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/IotCamera3D11Codec.java new file mode 100644 index 0000000..e4017bd --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/IotCamera3D11Codec.java @@ -0,0 +1,68 @@ +package com.viewsh.module.iot.gateway.codec.camera3d11; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.map.MapUtil; +import com.viewsh.framework.common.util.json.JsonUtils; +import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum; +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec; +import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11DataUploadReqVO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Map; + +/** + * 3D11 单目客流计数器 编解码器 + */ +@Slf4j +@Component +public class IotCamera3D11Codec implements IotDeviceMessageCodec { + + public static final String TYPE = "CAMERA_3D11"; + + @Override + public String type() { + return TYPE; + } + + @Override + public IotDeviceMessage decode(byte[] bytes) { + // 1. 解析 JSON + Camera3D11DataUploadReqVO payload; + try { + payload = JsonUtils.parseObject(bytes, Camera3D11DataUploadReqVO.class); + } catch (Exception e) { + log.error("[decode][解析 3D11 数据失败]", e); + throw new IllegalArgumentException("JSON 格式错误"); + } + Assert.notNull(payload, "消息内容不能为空"); + + // 2. 构建属性参数 + Map params = MapUtil.newHashMap(); + params.put("people_in", payload.getPeopleIn()); + params.put("people_out", payload.getPeopleOut()); + params.put("stat_start_time", payload.getStartTime()); + params.put("stat_end_time", payload.getEndTime()); + + // 3. 使用 endTime 作为上报时间 + LocalDateTime reportTime = LocalDateTime.now(); + if (payload.getEndTime() != null) { + reportTime = LocalDateTime.ofInstant( + Instant.ofEpochSecond(payload.getEndTime()), ZoneId.systemDefault()); + } + + return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params) + .setReportTime(reportTime); + } + + @Override + public byte[] encode(IotDeviceMessage message) { + // 3D11 不需要下行指令 + return new byte[0]; + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11DataUploadReqVO.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11DataUploadReqVO.java new file mode 100644 index 0000000..668c44d --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11DataUploadReqVO.java @@ -0,0 +1,44 @@ +package com.viewsh.module.iot.gateway.codec.camera3d11.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +/** + * 3D11 单目客流计数器 — 数据上报请求 + */ +@Data +public class Camera3D11DataUploadReqVO { + + /** + * 设备序列号 + */ + private String sn; + + /** + * 统计开始时间(Unix 秒) + */ + private Long startTime; + + /** + * 统计结束时间(Unix 秒) + */ + private Long endTime; + + /** + * 上传时间(Unix 秒) + */ + private Long time; + + /** + * 进客流 + */ + @JsonProperty("in") + private Integer peopleIn; + + /** + * 出客流 + */ + @JsonProperty("out") + private Integer peopleOut; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11HeartBeatReqVO.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11HeartBeatReqVO.java new file mode 100644 index 0000000..77f24d4 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11HeartBeatReqVO.java @@ -0,0 +1,21 @@ +package com.viewsh.module.iot.gateway.codec.camera3d11.dto; + +import lombok.Data; + +/** + * 3D11 单目客流计数器 — 心跳请求 + */ +@Data +public class Camera3D11HeartBeatReqVO { + + /** + * 设备序列号 + */ + private String sn; + + /** + * Unix 时间戳(秒) + */ + private Long time; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11Resp.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11Resp.java new file mode 100644 index 0000000..5a5ccec --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/camera3d11/dto/Camera3D11Resp.java @@ -0,0 +1,36 @@ +package com.viewsh.module.iot.gateway.codec.camera3d11.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 3D11 单目客流计数器 — 统一响应 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Camera3D11Resp { + + /** + * 响应码:0=成功 1=SN不存在 2=失败 + */ + private Integer code; + + private String msg; + + private Object data; + + public static Camera3D11Resp success(Object data) { + return new Camera3D11Resp(0, "success", data); + } + + public static Camera3D11Resp snNotFound() { + return new Camera3D11Resp(1, "sn不存在", null); + } + + public static Camera3D11Resp error(String msg) { + return new Camera3D11Resp(2, msg, null); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/config/IotGatewayProperties.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/config/IotGatewayProperties.java index 8487629..d4a2bd8 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/config/IotGatewayProperties.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/config/IotGatewayProperties.java @@ -1,405 +1,412 @@ -package com.viewsh.module.iot.gateway.config; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import java.time.Duration; -import java.util.List; - -@ConfigurationProperties(prefix = "viewsh.iot.gateway") -@Validated -@Data -public class IotGatewayProperties { - - /** - * 设备 RPC 服务配置 - */ - private RpcProperties rpc; - /** - * Token 配置 - */ - private TokenProperties token; - - /** - * 协议配置 - */ - private ProtocolProperties protocol; - - @Data - public static class RpcProperties { - - /** - * 主程序 API 地址 - */ - @NotEmpty(message = "主程序 API 地址不能为空") - private String url; - /** - * 连接超时时间 - */ - @NotNull(message = "连接超时时间不能为空") - private Duration connectTimeout; - /** - * 读取超时时间 - */ - @NotNull(message = "读取超时时间不能为空") - private Duration readTimeout; - - } - - @Data - public static class TokenProperties { - - /** - * 密钥 - */ - @NotEmpty(message = "密钥不能为空") - private String secret; - /** - * 令牌有效期 - */ - @NotNull(message = "令牌有效期不能为空") - private Duration expiration; - - } - - @Data - public static class ProtocolProperties { - - /** - * HTTP 组件配置 - */ - private HttpProperties http; - - /** - * EMQX 组件配置 - */ - private EmqxProperties emqx; - - /** - * TCP 组件配置 - */ - private TcpProperties tcp; - - /** - * MQTT 组件配置 - */ - private MqttProperties mqtt; - - } - - @Data - public static class HttpProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - /** - * 服务端口 - */ - private Integer serverPort; - - /** - * 是否开启 SSL - */ - @NotNull(message = "是否开启 SSL 不能为空") - private Boolean sslEnabled = false; - - /** - * SSL 证书路径 - */ - private String sslKeyPath; - /** - * SSL 证书路径 - */ - private String sslCertPath; - - } - - @Data - public static class EmqxProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * HTTP 服务端口(默认:8090) - */ - private Integer httpPort = 8090; - - /** - * MQTT 服务器地址 - */ - @NotEmpty(message = "MQTT 服务器地址不能为空") - private String mqttHost; - - /** - * MQTT 服务器端口(默认:1883) - */ - @NotNull(message = "MQTT 服务器端口不能为空") - private Integer mqttPort = 1883; - - /** - * MQTT 用户名 - */ - @NotEmpty(message = "MQTT 用户名不能为空") - private String mqttUsername; - - /** - * MQTT 密码 - */ - @NotEmpty(message = "MQTT 密码不能为空") - private String mqttPassword; - - /** - * MQTT 客户端的 SSL 开关 - */ - @NotNull(message = "MQTT 是否开启 SSL 不能为空") - private Boolean mqttSsl = false; - - /** - * MQTT 客户端 ID(如果为空,系统将自动生成) - */ - @NotEmpty(message = "MQTT 客户端 ID 不能为空") - private String mqttClientId; - - /** - * MQTT 订阅的主题 - */ - @NotEmpty(message = "MQTT 主题不能为空") - private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics; - - /** - * 默认 QoS 级别 - *

- * 0 - 最多一次 - * 1 - 至少一次 - * 2 - 刚好一次 - */ - private Integer mqttQos = 1; - - /** - * 连接超时时间(秒) - */ - private Integer connectTimeoutSeconds = 10; - - /** - * 重连延迟时间(毫秒) - */ - private Long reconnectDelayMs = 5000L; - - /** - * 是否启用 Clean Session (清理会话) - * true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。 - * 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。 - */ - private Boolean cleanSession = true; - - /** - * 心跳间隔(秒) - * 用于保持连接活性,及时发现网络中断。 - */ - private Integer keepAliveIntervalSeconds = 60; - - /** - * 最大未确认消息队列大小 - * 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。 - */ - private Integer maxInflightQueue = 10000; - - /** - * 是否信任所有 SSL 证书 - * 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用! - * 在生产环境中,应设置为 false,并配置正确的信任库。 - */ - private Boolean trustAll = false; - - /** - * 遗嘱消息配置 (用于网关异常下线时通知其他系统) - */ - private final Will will = new Will(); - - /** - * 高级 SSL/TLS 配置 (用于生产环境) - */ - private final Ssl sslOptions = new Ssl(); - - /** - * 遗嘱消息 (Last Will and Testament) - */ - @Data - public static class Will { - - /** - * 是否启用遗嘱消息 - */ - private boolean enabled = false; - /** - * 遗嘱消息主题 - */ - private String topic; - /** - * 遗嘱消息内容 - */ - private String payload; - /** - * 遗嘱消息 QoS 等级 - */ - private Integer qos = 1; - /** - * 遗嘱消息是否作为保留消息发布 - */ - private boolean retain = true; - - } - - /** - * 高级 SSL/TLS 配置 - */ - @Data - public static class Ssl { - - /** - * 密钥库(KeyStore)路径,例如:classpath:certs/client.jks - * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。 - */ - private String keyStorePath; - /** - * 密钥库密码 - */ - private String keyStorePassword; - /** - * 信任库(TrustStore)路径,例如:classpath:certs/trust.jks - * 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。 - */ - private String trustStorePath; - /** - * 信任库密码 - */ - private String trustStorePassword; - - } - - } - - @Data - public static class TcpProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 服务器端口 - */ - private Integer port = 8091; - - /** - * 心跳超时时间(毫秒) - */ - private Long keepAliveTimeoutMs = 30000L; - - /** - * 最大连接数 - */ - private Integer maxConnections = 1000; - - /** - * 是否启用SSL - */ - private Boolean sslEnabled = false; - - /** - * SSL证书路径 - */ - private String sslCertPath; - - /** - * SSL私钥路径 - */ - private String sslKeyPath; - - } - - @Data - public static class MqttProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 服务器端口 - */ - private Integer port = 1883; - - /** - * 最大消息大小(字节) - */ - private Integer maxMessageSize = 8192; - - /** - * 连接超时时间(秒) - */ - private Integer connectTimeoutSeconds = 60; - /** - * 保持连接超时时间(秒) - */ - private Integer keepAliveTimeoutSeconds = 300; - - /** - * 是否启用 SSL - */ - private Boolean sslEnabled = false; - /** - * SSL 配置 - */ - private SslOptions sslOptions = new SslOptions(); - - /** - * SSL 配置选项 - */ - @Data - public static class SslOptions { - - /** - * 密钥证书选项 - */ - private io.vertx.core.net.KeyCertOptions keyCertOptions; - /** - * 信任选项 - */ - private io.vertx.core.net.TrustOptions trustOptions; - /** - * SSL 证书路径 - */ - private String certPath; - /** - * SSL 私钥路径 - */ - private String keyPath; - /** - * 信任存储路径 - */ - private String trustStorePath; - /** - * 信任存储密码 - */ - private String trustStorePassword; - - } - - } - -} +package com.viewsh.module.iot.gateway.config; + +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import java.time.Duration; +import java.util.List; + +@ConfigurationProperties(prefix = "viewsh.iot.gateway") +@Validated +@Data +public class IotGatewayProperties { + + /** + * 设备 RPC 服务配置 + */ + private RpcProperties rpc; + /** + * Token 配置 + */ + private TokenProperties token; + + /** + * 协议配置 + */ + private ProtocolProperties protocol; + + @Data + public static class RpcProperties { + + /** + * 主程序 API 地址 + */ + @NotEmpty(message = "主程序 API 地址不能为空") + private String url; + /** + * 连接超时时间 + */ + @NotNull(message = "连接超时时间不能为空") + private Duration connectTimeout; + /** + * 读取超时时间 + */ + @NotNull(message = "读取超时时间不能为空") + private Duration readTimeout; + + } + + @Data + public static class TokenProperties { + + /** + * 密钥 + */ + @NotEmpty(message = "密钥不能为空") + private String secret; + /** + * 令牌有效期 + */ + @NotNull(message = "令牌有效期不能为空") + private Duration expiration; + + } + + @Data + public static class ProtocolProperties { + + /** + * HTTP 组件配置 + */ + private HttpProperties http; + + /** + * EMQX 组件配置 + */ + private EmqxProperties emqx; + + /** + * TCP 组件配置 + */ + private TcpProperties tcp; + + /** + * MQTT 组件配置 + */ + private MqttProperties mqtt; + + } + + @Data + public static class HttpProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + /** + * 服务端口 + */ + private Integer serverPort; + + /** + * 是否开启 SSL + */ + @NotNull(message = "是否开启 SSL 不能为空") + private Boolean sslEnabled = false; + + /** + * SSL 证书路径 + */ + private String sslKeyPath; + /** + * SSL 证书路径 + */ + private String sslCertPath; + + /** + * 3D11 单目客流计数器的产品 Key + * + * 设备通过 /api/camera/* 路径上报,需要配置此值用于设备查找 + */ + private String camera3d11ProductKey; + + } + + @Data + public static class EmqxProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * HTTP 服务端口(默认:8090) + */ + private Integer httpPort = 8090; + + /** + * MQTT 服务器地址 + */ + @NotEmpty(message = "MQTT 服务器地址不能为空") + private String mqttHost; + + /** + * MQTT 服务器端口(默认:1883) + */ + @NotNull(message = "MQTT 服务器端口不能为空") + private Integer mqttPort = 1883; + + /** + * MQTT 用户名 + */ + @NotEmpty(message = "MQTT 用户名不能为空") + private String mqttUsername; + + /** + * MQTT 密码 + */ + @NotEmpty(message = "MQTT 密码不能为空") + private String mqttPassword; + + /** + * MQTT 客户端的 SSL 开关 + */ + @NotNull(message = "MQTT 是否开启 SSL 不能为空") + private Boolean mqttSsl = false; + + /** + * MQTT 客户端 ID(如果为空,系统将自动生成) + */ + @NotEmpty(message = "MQTT 客户端 ID 不能为空") + private String mqttClientId; + + /** + * MQTT 订阅的主题 + */ + @NotEmpty(message = "MQTT 主题不能为空") + private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics; + + /** + * 默认 QoS 级别 + *

+ * 0 - 最多一次 + * 1 - 至少一次 + * 2 - 刚好一次 + */ + private Integer mqttQos = 1; + + /** + * 连接超时时间(秒) + */ + private Integer connectTimeoutSeconds = 10; + + /** + * 重连延迟时间(毫秒) + */ + private Long reconnectDelayMs = 5000L; + + /** + * 是否启用 Clean Session (清理会话) + * true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。 + * 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。 + */ + private Boolean cleanSession = true; + + /** + * 心跳间隔(秒) + * 用于保持连接活性,及时发现网络中断。 + */ + private Integer keepAliveIntervalSeconds = 60; + + /** + * 最大未确认消息队列大小 + * 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。 + */ + private Integer maxInflightQueue = 10000; + + /** + * 是否信任所有 SSL 证书 + * 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用! + * 在生产环境中,应设置为 false,并配置正确的信任库。 + */ + private Boolean trustAll = false; + + /** + * 遗嘱消息配置 (用于网关异常下线时通知其他系统) + */ + private final Will will = new Will(); + + /** + * 高级 SSL/TLS 配置 (用于生产环境) + */ + private final Ssl sslOptions = new Ssl(); + + /** + * 遗嘱消息 (Last Will and Testament) + */ + @Data + public static class Will { + + /** + * 是否启用遗嘱消息 + */ + private boolean enabled = false; + /** + * 遗嘱消息主题 + */ + private String topic; + /** + * 遗嘱消息内容 + */ + private String payload; + /** + * 遗嘱消息 QoS 等级 + */ + private Integer qos = 1; + /** + * 遗嘱消息是否作为保留消息发布 + */ + private boolean retain = true; + + } + + /** + * 高级 SSL/TLS 配置 + */ + @Data + public static class Ssl { + + /** + * 密钥库(KeyStore)路径,例如:classpath:certs/client.jks + * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。 + */ + private String keyStorePath; + /** + * 密钥库密码 + */ + private String keyStorePassword; + /** + * 信任库(TrustStore)路径,例如:classpath:certs/trust.jks + * 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。 + */ + private String trustStorePath; + /** + * 信任库密码 + */ + private String trustStorePassword; + + } + + } + + @Data + public static class TcpProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * 服务器端口 + */ + private Integer port = 8091; + + /** + * 心跳超时时间(毫秒) + */ + private Long keepAliveTimeoutMs = 30000L; + + /** + * 最大连接数 + */ + private Integer maxConnections = 1000; + + /** + * 是否启用SSL + */ + private Boolean sslEnabled = false; + + /** + * SSL证书路径 + */ + private String sslCertPath; + + /** + * SSL私钥路径 + */ + private String sslKeyPath; + + } + + @Data + public static class MqttProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * 服务器端口 + */ + private Integer port = 1883; + + /** + * 最大消息大小(字节) + */ + private Integer maxMessageSize = 8192; + + /** + * 连接超时时间(秒) + */ + private Integer connectTimeoutSeconds = 60; + /** + * 保持连接超时时间(秒) + */ + private Integer keepAliveTimeoutSeconds = 300; + + /** + * 是否启用 SSL + */ + private Boolean sslEnabled = false; + /** + * SSL 配置 + */ + private SslOptions sslOptions = new SslOptions(); + + /** + * SSL 配置选项 + */ + @Data + public static class SslOptions { + + /** + * 密钥证书选项 + */ + private io.vertx.core.net.KeyCertOptions keyCertOptions; + /** + * 信任选项 + */ + private io.vertx.core.net.TrustOptions trustOptions; + /** + * SSL 证书路径 + */ + private String certPath; + /** + * SSL 私钥路径 + */ + private String keyPath; + /** + * 信任存储路径 + */ + private String trustStorePath; + /** + * 信任存储密码 + */ + private String trustStorePassword; + + } + + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java index 5137b82..c2f15b0 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/IotHttpUpstreamProtocol.java @@ -1,86 +1,95 @@ -package com.viewsh.module.iot.gateway.protocol.http; - -import com.viewsh.module.iot.core.util.IotDeviceMessageUtils; -import com.viewsh.module.iot.gateway.config.IotGatewayProperties; -import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpAuthHandler; -import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServer; -import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.net.PemKeyCertOptions; -import io.vertx.ext.web.Router; -import io.vertx.ext.web.handler.BodyHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 HTTP 协议:接收设备上行消息 - * - * @author 芋道源码 - */ -@Slf4j -public class IotHttpUpstreamProtocol extends AbstractVerticle { - - private final IotGatewayProperties.HttpProperties httpProperties; - - private HttpServer httpServer; - - @Getter - private final String serverId; - - public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) { - this.httpProperties = httpProperties; - this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort()); - } - - @Override - @PostConstruct - public void start() { - // 创建路由 - Vertx vertx = Vertx.vertx(); - Router router = Router.router(vertx); - router.route().handler(BodyHandler.create()); - - // 创建处理器,添加路由处理器 - IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this); - router.post(IotHttpAuthHandler.PATH).handler(authHandler); - IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this); - router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler); - - // 启动 HTTP 服务器 - HttpServerOptions options = new HttpServerOptions() - .setPort(httpProperties.getServerPort()); - if (Boolean.TRUE.equals(httpProperties.getSslEnabled())) { - PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions().setKeyPath(httpProperties.getSslKeyPath()) - .setCertPath(httpProperties.getSslCertPath()); - options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); - } - try { - httpServer = vertx.createHttpServer(options) - .requestHandler(router) - .listen() - .result(); - log.info("[start][IoT 网关 HTTP 协议启动成功,端口:{}]", httpProperties.getServerPort()); - } catch (Exception e) { - log.error("[start][IoT 网关 HTTP 协议启动失败]", e); - throw e; - } - } - - @Override - @PreDestroy - public void stop() { - if (httpServer != null) { - try { - httpServer.close().result(); - log.info("[stop][IoT 网关 HTTP 协议已停止]"); - } catch (Exception e) { - log.error("[stop][IoT 网关 HTTP 协议停止失败]", e); - } - } - } - -} +package com.viewsh.module.iot.gateway.protocol.http; + +import cn.hutool.core.util.StrUtil; +import com.viewsh.module.iot.core.util.IotDeviceMessageUtils; +import com.viewsh.module.iot.gateway.config.IotGatewayProperties; +import com.viewsh.module.iot.gateway.protocol.http.router.IotCameraUpstreamHandler; +import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpAuthHandler; +import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 HTTP 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotHttpUpstreamProtocol extends AbstractVerticle { + + private final IotGatewayProperties.HttpProperties httpProperties; + + private HttpServer httpServer; + + @Getter + private final String serverId; + + public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) { + this.httpProperties = httpProperties; + this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort()); + } + + @Override + @PostConstruct + public void start() { + // 创建路由 + Vertx vertx = Vertx.vertx(); + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); + + // 创建处理器,添加路由处理器 + IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this); + router.post(IotHttpAuthHandler.PATH).handler(authHandler); + IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this); + router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler); + // 3D11 单目客流计数器专用路由(仅在配置了 productKey 时启用) + String camera3d11ProductKey = httpProperties.getCamera3d11ProductKey(); + if (StrUtil.isNotBlank(camera3d11ProductKey)) { + IotCameraUpstreamHandler cameraHandler = new IotCameraUpstreamHandler(this, camera3d11ProductKey); + router.post(IotCameraUpstreamHandler.PATH_HEARTBEAT).handler(cameraHandler); + router.post(IotCameraUpstreamHandler.PATH_DATA_UPLOAD).handler(cameraHandler); + } + + // 启动 HTTP 服务器 + HttpServerOptions options = new HttpServerOptions() + .setPort(httpProperties.getServerPort()); + if (Boolean.TRUE.equals(httpProperties.getSslEnabled())) { + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions().setKeyPath(httpProperties.getSslKeyPath()) + .setCertPath(httpProperties.getSslCertPath()); + options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); + } + try { + httpServer = vertx.createHttpServer(options) + .requestHandler(router) + .listen() + .result(); + log.info("[start][IoT 网关 HTTP 协议启动成功,端口:{}]", httpProperties.getServerPort()); + } catch (Exception e) { + log.error("[start][IoT 网关 HTTP 协议启动失败]", e); + throw e; + } + } + + @Override + @PreDestroy + public void stop() { + if (httpServer != null) { + try { + httpServer.close().result(); + log.info("[stop][IoT 网关 HTTP 协议已停止]"); + } catch (Exception e) { + log.error("[stop][IoT 网关 HTTP 协议停止失败]", e); + } + } + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotCameraUpstreamHandler.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotCameraUpstreamHandler.java new file mode 100644 index 0000000..3214ee0 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/http/router/IotCameraUpstreamHandler.java @@ -0,0 +1,150 @@ +package com.viewsh.module.iot.gateway.protocol.http.router; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.viewsh.framework.common.util.json.JsonUtils; +import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO; +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.gateway.codec.camera3d11.IotCamera3D11Codec; +import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11DataUploadReqVO; +import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11HeartBeatReqVO; +import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11Resp; +import com.viewsh.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; +import com.viewsh.module.iot.gateway.service.device.IotDeviceService; +import com.viewsh.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Map; + +/** + * 3D11 单目客流计数器 — HTTP 上行处理器 + *

+ * 直接实现 {@link Handler},不继承 {@link IotHttpAbstractHandler}, + * 因其认证逻辑依赖路径参数 (productKey/deviceName),而 3D11 使用固定路径 + body 中 sn 识别设备。 + */ +@Slf4j +public class IotCameraUpstreamHandler implements Handler { + + public static final String PATH_HEARTBEAT = "/api/camera/heartBeat"; + public static final String PATH_DATA_UPLOAD = "/api/camera/dataUpload"; + + /** + * 心跳响应默认上报间隔(分钟) + */ + private static final int DEFAULT_UPLOAD_INTERVAL = 1; + + private static final ZoneId ZONE_SHANGHAI = ZoneId.of("Asia/Shanghai"); + + private final IotHttpUpstreamProtocol protocol; + private final String productKey; + private final IotDeviceService deviceService; + private final IotDeviceMessageService deviceMessageService; + + public IotCameraUpstreamHandler(IotHttpUpstreamProtocol protocol, String productKey) { + this.protocol = protocol; + this.productKey = productKey; + this.deviceService = SpringUtil.getBean(IotDeviceService.class); + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + } + + @Override + public void handle(RoutingContext context) { + try { + String path = context.request().path(); + if (PATH_HEARTBEAT.equals(path)) { + handleHeartBeat(context); + } else if (PATH_DATA_UPLOAD.equals(path)) { + handleDataUpload(context); + } else { + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("未知路径")); + } + } catch (Exception e) { + log.error("[handle][3D11 处理异常, path={}]", context.request().path(), e); + if (!context.response().ended()) { + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("服务器内部错误")); + } + } + } + + /** + * 心跳处理 + */ + private void handleHeartBeat(RoutingContext context) { + // 1. 解析请求 + Camera3D11HeartBeatReqVO req = JsonUtils.parseObject( + context.body().buffer().getBytes(), Camera3D11HeartBeatReqVO.class); + if (req == null || req.getSn() == null) { + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("参数错误")); + return; + } + + // 2. 校验设备存在 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, req.getSn()); + if (device == null) { + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.snNotFound()); + return; + } + + // 3. 发送上线消息 + IotDeviceMessage onlineMsg = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(onlineMsg, productKey, req.getSn(), protocol.getServerId()); + + // 4. 构建心跳响应 + LocalDate today = LocalDate.now(); + long dataStartTime = today.atTime(LocalTime.of(8, 0)).atZone(ZONE_SHANGHAI).toEpochSecond(); + long dataEndTime = today.atTime(LocalTime.of(23, 0)).atZone(ZONE_SHANGHAI).toEpochSecond(); + long currentTime = LocalDateTime.now().atZone(ZONE_SHANGHAI).toEpochSecond(); + + Map data = MapUtil.newHashMap(); + data.put("sn", req.getSn()); + data.put("dataStartTime", dataStartTime); + data.put("dataEndTime", dataEndTime); + data.put("uploadInterval", DEFAULT_UPLOAD_INTERVAL); + data.put("time", currentTime); + + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.success(data)); + } + + /** + * 数据上报处理 + */ + private void handleDataUpload(RoutingContext context) { + // 1. 解析请求 + byte[] bytes = context.body().buffer().getBytes(); + Camera3D11DataUploadReqVO req = JsonUtils.parseObject(bytes, Camera3D11DataUploadReqVO.class); + if (req == null || req.getSn() == null) { + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("参数错误")); + return; + } + + // 2. 校验设备存在 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, req.getSn()); + if (device == null) { + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.snNotFound()); + return; + } + + // 3. 解码并发送消息 + IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes, IotCamera3D11Codec.TYPE); + if (message == null) { + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("消息解码失败")); + return; + } + deviceMessageService.sendDeviceMessage(message, productKey, req.getSn(), null); + + // 4. 构建响应 + long currentTime = LocalDateTime.now().atZone(ZONE_SHANGHAI).toEpochSecond(); + Map data = MapUtil.newHashMap(); + data.put("sn", req.getSn()); + data.put("time", currentTime); + + IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.success(data)); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/resources/application.yaml b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/resources/application.yaml index b09f713..c671bba 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/resources/application.yaml +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/resources/application.yaml @@ -54,6 +54,7 @@ viewsh: http: enabled: true server-port: 8092 + camera3d11-product-key: camera_3d11 # 3D11 单目客流计数器的产品 Key # ==================================== # 针对引入的 EMQX 组件的配置 # ====================================