Merge branch 'feat/iot-people-counter'
This commit is contained in:
@@ -0,0 +1,68 @@
|
||||
package com.viewsh.module.iot.gateway.codec.camera3d11;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.viewsh.framework.common.util.json.JsonUtils;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11DataUploadReqVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 3D11 单目客流计数器 编解码器
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class IotCamera3D11Codec implements IotDeviceMessageCodec {
|
||||
|
||||
public static final String TYPE = "CAMERA_3D11";
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotDeviceMessage decode(byte[] bytes) {
|
||||
// 1. 解析 JSON
|
||||
Camera3D11DataUploadReqVO payload;
|
||||
try {
|
||||
payload = JsonUtils.parseObject(bytes, Camera3D11DataUploadReqVO.class);
|
||||
} catch (Exception e) {
|
||||
log.error("[decode][解析 3D11 数据失败]", e);
|
||||
throw new IllegalArgumentException("JSON 格式错误");
|
||||
}
|
||||
Assert.notNull(payload, "消息内容不能为空");
|
||||
|
||||
// 2. 构建属性参数
|
||||
Map<String, Object> params = MapUtil.newHashMap();
|
||||
params.put("people_in", payload.getPeopleIn());
|
||||
params.put("people_out", payload.getPeopleOut());
|
||||
params.put("stat_start_time", payload.getStartTime());
|
||||
params.put("stat_end_time", payload.getEndTime());
|
||||
|
||||
// 3. 使用 endTime 作为上报时间
|
||||
LocalDateTime reportTime = LocalDateTime.now();
|
||||
if (payload.getEndTime() != null) {
|
||||
reportTime = LocalDateTime.ofInstant(
|
||||
Instant.ofEpochSecond(payload.getEndTime()), ZoneId.systemDefault());
|
||||
}
|
||||
|
||||
return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params)
|
||||
.setReportTime(reportTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] encode(IotDeviceMessage message) {
|
||||
// 3D11 不需要下行指令
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.viewsh.module.iot.gateway.codec.camera3d11.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 3D11 单目客流计数器 — 数据上报请求
|
||||
*/
|
||||
@Data
|
||||
public class Camera3D11DataUploadReqVO {
|
||||
|
||||
/**
|
||||
* 设备序列号
|
||||
*/
|
||||
private String sn;
|
||||
|
||||
/**
|
||||
* 统计开始时间(Unix 秒)
|
||||
*/
|
||||
private Long startTime;
|
||||
|
||||
/**
|
||||
* 统计结束时间(Unix 秒)
|
||||
*/
|
||||
private Long endTime;
|
||||
|
||||
/**
|
||||
* 上传时间(Unix 秒)
|
||||
*/
|
||||
private Long time;
|
||||
|
||||
/**
|
||||
* 进客流
|
||||
*/
|
||||
@JsonProperty("in")
|
||||
private Integer peopleIn;
|
||||
|
||||
/**
|
||||
* 出客流
|
||||
*/
|
||||
@JsonProperty("out")
|
||||
private Integer peopleOut;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.viewsh.module.iot.gateway.codec.camera3d11.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 3D11 单目客流计数器 — 心跳请求
|
||||
*/
|
||||
@Data
|
||||
public class Camera3D11HeartBeatReqVO {
|
||||
|
||||
/**
|
||||
* 设备序列号
|
||||
*/
|
||||
private String sn;
|
||||
|
||||
/**
|
||||
* Unix 时间戳(秒)
|
||||
*/
|
||||
private Long time;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.viewsh.module.iot.gateway.codec.camera3d11.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 3D11 单目客流计数器 — 统一响应
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class Camera3D11Resp {
|
||||
|
||||
/**
|
||||
* 响应码:0=成功 1=SN不存在 2=失败
|
||||
*/
|
||||
private Integer code;
|
||||
|
||||
private String msg;
|
||||
|
||||
private Object data;
|
||||
|
||||
public static Camera3D11Resp success(Object data) {
|
||||
return new Camera3D11Resp(0, "success", data);
|
||||
}
|
||||
|
||||
public static Camera3D11Resp snNotFound() {
|
||||
return new Camera3D11Resp(1, "sn不存在", null);
|
||||
}
|
||||
|
||||
public static Camera3D11Resp error(String msg) {
|
||||
return new Camera3D11Resp(2, msg, null);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,405 +1,412 @@
|
||||
package com.viewsh.module.iot.gateway.config;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
@ConfigurationProperties(prefix = "viewsh.iot.gateway")
|
||||
@Validated
|
||||
@Data
|
||||
public class IotGatewayProperties {
|
||||
|
||||
/**
|
||||
* 设备 RPC 服务配置
|
||||
*/
|
||||
private RpcProperties rpc;
|
||||
/**
|
||||
* Token 配置
|
||||
*/
|
||||
private TokenProperties token;
|
||||
|
||||
/**
|
||||
* 协议配置
|
||||
*/
|
||||
private ProtocolProperties protocol;
|
||||
|
||||
@Data
|
||||
public static class RpcProperties {
|
||||
|
||||
/**
|
||||
* 主程序 API 地址
|
||||
*/
|
||||
@NotEmpty(message = "主程序 API 地址不能为空")
|
||||
private String url;
|
||||
/**
|
||||
* 连接超时时间
|
||||
*/
|
||||
@NotNull(message = "连接超时时间不能为空")
|
||||
private Duration connectTimeout;
|
||||
/**
|
||||
* 读取超时时间
|
||||
*/
|
||||
@NotNull(message = "读取超时时间不能为空")
|
||||
private Duration readTimeout;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class TokenProperties {
|
||||
|
||||
/**
|
||||
* 密钥
|
||||
*/
|
||||
@NotEmpty(message = "密钥不能为空")
|
||||
private String secret;
|
||||
/**
|
||||
* 令牌有效期
|
||||
*/
|
||||
@NotNull(message = "令牌有效期不能为空")
|
||||
private Duration expiration;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class ProtocolProperties {
|
||||
|
||||
/**
|
||||
* HTTP 组件配置
|
||||
*/
|
||||
private HttpProperties http;
|
||||
|
||||
/**
|
||||
* EMQX 组件配置
|
||||
*/
|
||||
private EmqxProperties emqx;
|
||||
|
||||
/**
|
||||
* TCP 组件配置
|
||||
*/
|
||||
private TcpProperties tcp;
|
||||
|
||||
/**
|
||||
* MQTT 组件配置
|
||||
*/
|
||||
private MqttProperties mqtt;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class HttpProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
/**
|
||||
* 服务端口
|
||||
*/
|
||||
private Integer serverPort;
|
||||
|
||||
/**
|
||||
* 是否开启 SSL
|
||||
*/
|
||||
@NotNull(message = "是否开启 SSL 不能为空")
|
||||
private Boolean sslEnabled = false;
|
||||
|
||||
/**
|
||||
* SSL 证书路径
|
||||
*/
|
||||
private String sslKeyPath;
|
||||
/**
|
||||
* SSL 证书路径
|
||||
*/
|
||||
private String sslCertPath;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class EmqxProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* HTTP 服务端口(默认:8090)
|
||||
*/
|
||||
private Integer httpPort = 8090;
|
||||
|
||||
/**
|
||||
* MQTT 服务器地址
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 服务器地址不能为空")
|
||||
private String mqttHost;
|
||||
|
||||
/**
|
||||
* MQTT 服务器端口(默认:1883)
|
||||
*/
|
||||
@NotNull(message = "MQTT 服务器端口不能为空")
|
||||
private Integer mqttPort = 1883;
|
||||
|
||||
/**
|
||||
* MQTT 用户名
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 用户名不能为空")
|
||||
private String mqttUsername;
|
||||
|
||||
/**
|
||||
* MQTT 密码
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 密码不能为空")
|
||||
private String mqttPassword;
|
||||
|
||||
/**
|
||||
* MQTT 客户端的 SSL 开关
|
||||
*/
|
||||
@NotNull(message = "MQTT 是否开启 SSL 不能为空")
|
||||
private Boolean mqttSsl = false;
|
||||
|
||||
/**
|
||||
* MQTT 客户端 ID(如果为空,系统将自动生成)
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 客户端 ID 不能为空")
|
||||
private String mqttClientId;
|
||||
|
||||
/**
|
||||
* MQTT 订阅的主题
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 主题不能为空")
|
||||
private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics;
|
||||
|
||||
/**
|
||||
* 默认 QoS 级别
|
||||
* <p>
|
||||
* 0 - 最多一次
|
||||
* 1 - 至少一次
|
||||
* 2 - 刚好一次
|
||||
*/
|
||||
private Integer mqttQos = 1;
|
||||
|
||||
/**
|
||||
* 连接超时时间(秒)
|
||||
*/
|
||||
private Integer connectTimeoutSeconds = 10;
|
||||
|
||||
/**
|
||||
* 重连延迟时间(毫秒)
|
||||
*/
|
||||
private Long reconnectDelayMs = 5000L;
|
||||
|
||||
/**
|
||||
* 是否启用 Clean Session (清理会话)
|
||||
* true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。
|
||||
* 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。
|
||||
*/
|
||||
private Boolean cleanSession = true;
|
||||
|
||||
/**
|
||||
* 心跳间隔(秒)
|
||||
* 用于保持连接活性,及时发现网络中断。
|
||||
*/
|
||||
private Integer keepAliveIntervalSeconds = 60;
|
||||
|
||||
/**
|
||||
* 最大未确认消息队列大小
|
||||
* 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。
|
||||
*/
|
||||
private Integer maxInflightQueue = 10000;
|
||||
|
||||
/**
|
||||
* 是否信任所有 SSL 证书
|
||||
* 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用!
|
||||
* 在生产环境中,应设置为 false,并配置正确的信任库。
|
||||
*/
|
||||
private Boolean trustAll = false;
|
||||
|
||||
/**
|
||||
* 遗嘱消息配置 (用于网关异常下线时通知其他系统)
|
||||
*/
|
||||
private final Will will = new Will();
|
||||
|
||||
/**
|
||||
* 高级 SSL/TLS 配置 (用于生产环境)
|
||||
*/
|
||||
private final Ssl sslOptions = new Ssl();
|
||||
|
||||
/**
|
||||
* 遗嘱消息 (Last Will and Testament)
|
||||
*/
|
||||
@Data
|
||||
public static class Will {
|
||||
|
||||
/**
|
||||
* 是否启用遗嘱消息
|
||||
*/
|
||||
private boolean enabled = false;
|
||||
/**
|
||||
* 遗嘱消息主题
|
||||
*/
|
||||
private String topic;
|
||||
/**
|
||||
* 遗嘱消息内容
|
||||
*/
|
||||
private String payload;
|
||||
/**
|
||||
* 遗嘱消息 QoS 等级
|
||||
*/
|
||||
private Integer qos = 1;
|
||||
/**
|
||||
* 遗嘱消息是否作为保留消息发布
|
||||
*/
|
||||
private boolean retain = true;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 高级 SSL/TLS 配置
|
||||
*/
|
||||
@Data
|
||||
public static class Ssl {
|
||||
|
||||
/**
|
||||
* 密钥库(KeyStore)路径,例如:classpath:certs/client.jks
|
||||
* 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。
|
||||
*/
|
||||
private String keyStorePath;
|
||||
/**
|
||||
* 密钥库密码
|
||||
*/
|
||||
private String keyStorePassword;
|
||||
/**
|
||||
* 信任库(TrustStore)路径,例如:classpath:certs/trust.jks
|
||||
* 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。
|
||||
*/
|
||||
private String trustStorePath;
|
||||
/**
|
||||
* 信任库密码
|
||||
*/
|
||||
private String trustStorePassword;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class TcpProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 服务器端口
|
||||
*/
|
||||
private Integer port = 8091;
|
||||
|
||||
/**
|
||||
* 心跳超时时间(毫秒)
|
||||
*/
|
||||
private Long keepAliveTimeoutMs = 30000L;
|
||||
|
||||
/**
|
||||
* 最大连接数
|
||||
*/
|
||||
private Integer maxConnections = 1000;
|
||||
|
||||
/**
|
||||
* 是否启用SSL
|
||||
*/
|
||||
private Boolean sslEnabled = false;
|
||||
|
||||
/**
|
||||
* SSL证书路径
|
||||
*/
|
||||
private String sslCertPath;
|
||||
|
||||
/**
|
||||
* SSL私钥路径
|
||||
*/
|
||||
private String sslKeyPath;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class MqttProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 服务器端口
|
||||
*/
|
||||
private Integer port = 1883;
|
||||
|
||||
/**
|
||||
* 最大消息大小(字节)
|
||||
*/
|
||||
private Integer maxMessageSize = 8192;
|
||||
|
||||
/**
|
||||
* 连接超时时间(秒)
|
||||
*/
|
||||
private Integer connectTimeoutSeconds = 60;
|
||||
/**
|
||||
* 保持连接超时时间(秒)
|
||||
*/
|
||||
private Integer keepAliveTimeoutSeconds = 300;
|
||||
|
||||
/**
|
||||
* 是否启用 SSL
|
||||
*/
|
||||
private Boolean sslEnabled = false;
|
||||
/**
|
||||
* SSL 配置
|
||||
*/
|
||||
private SslOptions sslOptions = new SslOptions();
|
||||
|
||||
/**
|
||||
* SSL 配置选项
|
||||
*/
|
||||
@Data
|
||||
public static class SslOptions {
|
||||
|
||||
/**
|
||||
* 密钥证书选项
|
||||
*/
|
||||
private io.vertx.core.net.KeyCertOptions keyCertOptions;
|
||||
/**
|
||||
* 信任选项
|
||||
*/
|
||||
private io.vertx.core.net.TrustOptions trustOptions;
|
||||
/**
|
||||
* SSL 证书路径
|
||||
*/
|
||||
private String certPath;
|
||||
/**
|
||||
* SSL 私钥路径
|
||||
*/
|
||||
private String keyPath;
|
||||
/**
|
||||
* 信任存储路径
|
||||
*/
|
||||
private String trustStorePath;
|
||||
/**
|
||||
* 信任存储密码
|
||||
*/
|
||||
private String trustStorePassword;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
package com.viewsh.module.iot.gateway.config;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
@ConfigurationProperties(prefix = "viewsh.iot.gateway")
|
||||
@Validated
|
||||
@Data
|
||||
public class IotGatewayProperties {
|
||||
|
||||
/**
|
||||
* 设备 RPC 服务配置
|
||||
*/
|
||||
private RpcProperties rpc;
|
||||
/**
|
||||
* Token 配置
|
||||
*/
|
||||
private TokenProperties token;
|
||||
|
||||
/**
|
||||
* 协议配置
|
||||
*/
|
||||
private ProtocolProperties protocol;
|
||||
|
||||
@Data
|
||||
public static class RpcProperties {
|
||||
|
||||
/**
|
||||
* 主程序 API 地址
|
||||
*/
|
||||
@NotEmpty(message = "主程序 API 地址不能为空")
|
||||
private String url;
|
||||
/**
|
||||
* 连接超时时间
|
||||
*/
|
||||
@NotNull(message = "连接超时时间不能为空")
|
||||
private Duration connectTimeout;
|
||||
/**
|
||||
* 读取超时时间
|
||||
*/
|
||||
@NotNull(message = "读取超时时间不能为空")
|
||||
private Duration readTimeout;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class TokenProperties {
|
||||
|
||||
/**
|
||||
* 密钥
|
||||
*/
|
||||
@NotEmpty(message = "密钥不能为空")
|
||||
private String secret;
|
||||
/**
|
||||
* 令牌有效期
|
||||
*/
|
||||
@NotNull(message = "令牌有效期不能为空")
|
||||
private Duration expiration;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class ProtocolProperties {
|
||||
|
||||
/**
|
||||
* HTTP 组件配置
|
||||
*/
|
||||
private HttpProperties http;
|
||||
|
||||
/**
|
||||
* EMQX 组件配置
|
||||
*/
|
||||
private EmqxProperties emqx;
|
||||
|
||||
/**
|
||||
* TCP 组件配置
|
||||
*/
|
||||
private TcpProperties tcp;
|
||||
|
||||
/**
|
||||
* MQTT 组件配置
|
||||
*/
|
||||
private MqttProperties mqtt;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class HttpProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
/**
|
||||
* 服务端口
|
||||
*/
|
||||
private Integer serverPort;
|
||||
|
||||
/**
|
||||
* 是否开启 SSL
|
||||
*/
|
||||
@NotNull(message = "是否开启 SSL 不能为空")
|
||||
private Boolean sslEnabled = false;
|
||||
|
||||
/**
|
||||
* SSL 证书路径
|
||||
*/
|
||||
private String sslKeyPath;
|
||||
/**
|
||||
* SSL 证书路径
|
||||
*/
|
||||
private String sslCertPath;
|
||||
|
||||
/**
|
||||
* 3D11 单目客流计数器的产品 Key
|
||||
*
|
||||
* 设备通过 /api/camera/* 路径上报,需要配置此值用于设备查找
|
||||
*/
|
||||
private String camera3d11ProductKey;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class EmqxProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* HTTP 服务端口(默认:8090)
|
||||
*/
|
||||
private Integer httpPort = 8090;
|
||||
|
||||
/**
|
||||
* MQTT 服务器地址
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 服务器地址不能为空")
|
||||
private String mqttHost;
|
||||
|
||||
/**
|
||||
* MQTT 服务器端口(默认:1883)
|
||||
*/
|
||||
@NotNull(message = "MQTT 服务器端口不能为空")
|
||||
private Integer mqttPort = 1883;
|
||||
|
||||
/**
|
||||
* MQTT 用户名
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 用户名不能为空")
|
||||
private String mqttUsername;
|
||||
|
||||
/**
|
||||
* MQTT 密码
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 密码不能为空")
|
||||
private String mqttPassword;
|
||||
|
||||
/**
|
||||
* MQTT 客户端的 SSL 开关
|
||||
*/
|
||||
@NotNull(message = "MQTT 是否开启 SSL 不能为空")
|
||||
private Boolean mqttSsl = false;
|
||||
|
||||
/**
|
||||
* MQTT 客户端 ID(如果为空,系统将自动生成)
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 客户端 ID 不能为空")
|
||||
private String mqttClientId;
|
||||
|
||||
/**
|
||||
* MQTT 订阅的主题
|
||||
*/
|
||||
@NotEmpty(message = "MQTT 主题不能为空")
|
||||
private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics;
|
||||
|
||||
/**
|
||||
* 默认 QoS 级别
|
||||
* <p>
|
||||
* 0 - 最多一次
|
||||
* 1 - 至少一次
|
||||
* 2 - 刚好一次
|
||||
*/
|
||||
private Integer mqttQos = 1;
|
||||
|
||||
/**
|
||||
* 连接超时时间(秒)
|
||||
*/
|
||||
private Integer connectTimeoutSeconds = 10;
|
||||
|
||||
/**
|
||||
* 重连延迟时间(毫秒)
|
||||
*/
|
||||
private Long reconnectDelayMs = 5000L;
|
||||
|
||||
/**
|
||||
* 是否启用 Clean Session (清理会话)
|
||||
* true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。
|
||||
* 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。
|
||||
*/
|
||||
private Boolean cleanSession = true;
|
||||
|
||||
/**
|
||||
* 心跳间隔(秒)
|
||||
* 用于保持连接活性,及时发现网络中断。
|
||||
*/
|
||||
private Integer keepAliveIntervalSeconds = 60;
|
||||
|
||||
/**
|
||||
* 最大未确认消息队列大小
|
||||
* 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。
|
||||
*/
|
||||
private Integer maxInflightQueue = 10000;
|
||||
|
||||
/**
|
||||
* 是否信任所有 SSL 证书
|
||||
* 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用!
|
||||
* 在生产环境中,应设置为 false,并配置正确的信任库。
|
||||
*/
|
||||
private Boolean trustAll = false;
|
||||
|
||||
/**
|
||||
* 遗嘱消息配置 (用于网关异常下线时通知其他系统)
|
||||
*/
|
||||
private final Will will = new Will();
|
||||
|
||||
/**
|
||||
* 高级 SSL/TLS 配置 (用于生产环境)
|
||||
*/
|
||||
private final Ssl sslOptions = new Ssl();
|
||||
|
||||
/**
|
||||
* 遗嘱消息 (Last Will and Testament)
|
||||
*/
|
||||
@Data
|
||||
public static class Will {
|
||||
|
||||
/**
|
||||
* 是否启用遗嘱消息
|
||||
*/
|
||||
private boolean enabled = false;
|
||||
/**
|
||||
* 遗嘱消息主题
|
||||
*/
|
||||
private String topic;
|
||||
/**
|
||||
* 遗嘱消息内容
|
||||
*/
|
||||
private String payload;
|
||||
/**
|
||||
* 遗嘱消息 QoS 等级
|
||||
*/
|
||||
private Integer qos = 1;
|
||||
/**
|
||||
* 遗嘱消息是否作为保留消息发布
|
||||
*/
|
||||
private boolean retain = true;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 高级 SSL/TLS 配置
|
||||
*/
|
||||
@Data
|
||||
public static class Ssl {
|
||||
|
||||
/**
|
||||
* 密钥库(KeyStore)路径,例如:classpath:certs/client.jks
|
||||
* 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。
|
||||
*/
|
||||
private String keyStorePath;
|
||||
/**
|
||||
* 密钥库密码
|
||||
*/
|
||||
private String keyStorePassword;
|
||||
/**
|
||||
* 信任库(TrustStore)路径,例如:classpath:certs/trust.jks
|
||||
* 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。
|
||||
*/
|
||||
private String trustStorePath;
|
||||
/**
|
||||
* 信任库密码
|
||||
*/
|
||||
private String trustStorePassword;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class TcpProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 服务器端口
|
||||
*/
|
||||
private Integer port = 8091;
|
||||
|
||||
/**
|
||||
* 心跳超时时间(毫秒)
|
||||
*/
|
||||
private Long keepAliveTimeoutMs = 30000L;
|
||||
|
||||
/**
|
||||
* 最大连接数
|
||||
*/
|
||||
private Integer maxConnections = 1000;
|
||||
|
||||
/**
|
||||
* 是否启用SSL
|
||||
*/
|
||||
private Boolean sslEnabled = false;
|
||||
|
||||
/**
|
||||
* SSL证书路径
|
||||
*/
|
||||
private String sslCertPath;
|
||||
|
||||
/**
|
||||
* SSL私钥路径
|
||||
*/
|
||||
private String sslKeyPath;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class MqttProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 服务器端口
|
||||
*/
|
||||
private Integer port = 1883;
|
||||
|
||||
/**
|
||||
* 最大消息大小(字节)
|
||||
*/
|
||||
private Integer maxMessageSize = 8192;
|
||||
|
||||
/**
|
||||
* 连接超时时间(秒)
|
||||
*/
|
||||
private Integer connectTimeoutSeconds = 60;
|
||||
/**
|
||||
* 保持连接超时时间(秒)
|
||||
*/
|
||||
private Integer keepAliveTimeoutSeconds = 300;
|
||||
|
||||
/**
|
||||
* 是否启用 SSL
|
||||
*/
|
||||
private Boolean sslEnabled = false;
|
||||
/**
|
||||
* SSL 配置
|
||||
*/
|
||||
private SslOptions sslOptions = new SslOptions();
|
||||
|
||||
/**
|
||||
* SSL 配置选项
|
||||
*/
|
||||
@Data
|
||||
public static class SslOptions {
|
||||
|
||||
/**
|
||||
* 密钥证书选项
|
||||
*/
|
||||
private io.vertx.core.net.KeyCertOptions keyCertOptions;
|
||||
/**
|
||||
* 信任选项
|
||||
*/
|
||||
private io.vertx.core.net.TrustOptions trustOptions;
|
||||
/**
|
||||
* SSL 证书路径
|
||||
*/
|
||||
private String certPath;
|
||||
/**
|
||||
* SSL 私钥路径
|
||||
*/
|
||||
private String keyPath;
|
||||
/**
|
||||
* 信任存储路径
|
||||
*/
|
||||
private String trustStorePath;
|
||||
/**
|
||||
* 信任存储密码
|
||||
*/
|
||||
private String trustStorePassword;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,86 +1,95 @@
|
||||
package com.viewsh.module.iot.gateway.protocol.http;
|
||||
|
||||
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import com.viewsh.module.iot.gateway.config.IotGatewayProperties;
|
||||
import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpAuthHandler;
|
||||
import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.core.net.PemKeyCertOptions;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.handler.BodyHandler;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 HTTP 协议:接收设备上行消息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotHttpUpstreamProtocol extends AbstractVerticle {
|
||||
|
||||
private final IotGatewayProperties.HttpProperties httpProperties;
|
||||
|
||||
private HttpServer httpServer;
|
||||
|
||||
@Getter
|
||||
private final String serverId;
|
||||
|
||||
public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) {
|
||||
this.httpProperties = httpProperties;
|
||||
this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
// 创建路由
|
||||
Vertx vertx = Vertx.vertx();
|
||||
Router router = Router.router(vertx);
|
||||
router.route().handler(BodyHandler.create());
|
||||
|
||||
// 创建处理器,添加路由处理器
|
||||
IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this);
|
||||
router.post(IotHttpAuthHandler.PATH).handler(authHandler);
|
||||
IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this);
|
||||
router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler);
|
||||
|
||||
// 启动 HTTP 服务器
|
||||
HttpServerOptions options = new HttpServerOptions()
|
||||
.setPort(httpProperties.getServerPort());
|
||||
if (Boolean.TRUE.equals(httpProperties.getSslEnabled())) {
|
||||
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions().setKeyPath(httpProperties.getSslKeyPath())
|
||||
.setCertPath(httpProperties.getSslCertPath());
|
||||
options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
|
||||
}
|
||||
try {
|
||||
httpServer = vertx.createHttpServer(options)
|
||||
.requestHandler(router)
|
||||
.listen()
|
||||
.result();
|
||||
log.info("[start][IoT 网关 HTTP 协议启动成功,端口:{}]", httpProperties.getServerPort());
|
||||
} catch (Exception e) {
|
||||
log.error("[start][IoT 网关 HTTP 协议启动失败]", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (httpServer != null) {
|
||||
try {
|
||||
httpServer.close().result();
|
||||
log.info("[stop][IoT 网关 HTTP 协议已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][IoT 网关 HTTP 协议停止失败]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
package com.viewsh.module.iot.gateway.protocol.http;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import com.viewsh.module.iot.gateway.config.IotGatewayProperties;
|
||||
import com.viewsh.module.iot.gateway.protocol.http.router.IotCameraUpstreamHandler;
|
||||
import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpAuthHandler;
|
||||
import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.core.net.PemKeyCertOptions;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.handler.BodyHandler;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* IoT 网关 HTTP 协议:接收设备上行消息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotHttpUpstreamProtocol extends AbstractVerticle {
|
||||
|
||||
private final IotGatewayProperties.HttpProperties httpProperties;
|
||||
|
||||
private HttpServer httpServer;
|
||||
|
||||
@Getter
|
||||
private final String serverId;
|
||||
|
||||
public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) {
|
||||
this.httpProperties = httpProperties;
|
||||
this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
// 创建路由
|
||||
Vertx vertx = Vertx.vertx();
|
||||
Router router = Router.router(vertx);
|
||||
router.route().handler(BodyHandler.create());
|
||||
|
||||
// 创建处理器,添加路由处理器
|
||||
IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this);
|
||||
router.post(IotHttpAuthHandler.PATH).handler(authHandler);
|
||||
IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this);
|
||||
router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler);
|
||||
// 3D11 单目客流计数器专用路由(仅在配置了 productKey 时启用)
|
||||
String camera3d11ProductKey = httpProperties.getCamera3d11ProductKey();
|
||||
if (StrUtil.isNotBlank(camera3d11ProductKey)) {
|
||||
IotCameraUpstreamHandler cameraHandler = new IotCameraUpstreamHandler(this, camera3d11ProductKey);
|
||||
router.post(IotCameraUpstreamHandler.PATH_HEARTBEAT).handler(cameraHandler);
|
||||
router.post(IotCameraUpstreamHandler.PATH_DATA_UPLOAD).handler(cameraHandler);
|
||||
}
|
||||
|
||||
// 启动 HTTP 服务器
|
||||
HttpServerOptions options = new HttpServerOptions()
|
||||
.setPort(httpProperties.getServerPort());
|
||||
if (Boolean.TRUE.equals(httpProperties.getSslEnabled())) {
|
||||
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions().setKeyPath(httpProperties.getSslKeyPath())
|
||||
.setCertPath(httpProperties.getSslCertPath());
|
||||
options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
|
||||
}
|
||||
try {
|
||||
httpServer = vertx.createHttpServer(options)
|
||||
.requestHandler(router)
|
||||
.listen()
|
||||
.result();
|
||||
log.info("[start][IoT 网关 HTTP 协议启动成功,端口:{}]", httpProperties.getServerPort());
|
||||
} catch (Exception e) {
|
||||
log.error("[start][IoT 网关 HTTP 协议启动失败]", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (httpServer != null) {
|
||||
try {
|
||||
httpServer.close().result();
|
||||
log.info("[stop][IoT 网关 HTTP 协议已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][IoT 网关 HTTP 协议停止失败]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
package com.viewsh.module.iot.gateway.protocol.http.router;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.viewsh.framework.common.util.json.JsonUtils;
|
||||
import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.gateway.codec.camera3d11.IotCamera3D11Codec;
|
||||
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11DataUploadReqVO;
|
||||
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11HeartBeatReqVO;
|
||||
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11Resp;
|
||||
import com.viewsh.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
|
||||
import com.viewsh.module.iot.gateway.service.device.IotDeviceService;
|
||||
import com.viewsh.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 3D11 单目客流计数器 — HTTP 上行处理器
|
||||
* <p>
|
||||
* 直接实现 {@link Handler},不继承 {@link IotHttpAbstractHandler},
|
||||
* 因其认证逻辑依赖路径参数 (productKey/deviceName),而 3D11 使用固定路径 + body 中 sn 识别设备。
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCameraUpstreamHandler implements Handler<RoutingContext> {
|
||||
|
||||
public static final String PATH_HEARTBEAT = "/api/camera/heartBeat";
|
||||
public static final String PATH_DATA_UPLOAD = "/api/camera/dataUpload";
|
||||
|
||||
/**
|
||||
* 心跳响应默认上报间隔(分钟)
|
||||
*/
|
||||
private static final int DEFAULT_UPLOAD_INTERVAL = 1;
|
||||
|
||||
private static final ZoneId ZONE_SHANGHAI = ZoneId.of("Asia/Shanghai");
|
||||
|
||||
private final IotHttpUpstreamProtocol protocol;
|
||||
private final String productKey;
|
||||
private final IotDeviceService deviceService;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotCameraUpstreamHandler(IotHttpUpstreamProtocol protocol, String productKey) {
|
||||
this.protocol = protocol;
|
||||
this.productKey = productKey;
|
||||
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(RoutingContext context) {
|
||||
try {
|
||||
String path = context.request().path();
|
||||
if (PATH_HEARTBEAT.equals(path)) {
|
||||
handleHeartBeat(context);
|
||||
} else if (PATH_DATA_UPLOAD.equals(path)) {
|
||||
handleDataUpload(context);
|
||||
} else {
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("未知路径"));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][3D11 处理异常, path={}]", context.request().path(), e);
|
||||
if (!context.response().ended()) {
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("服务器内部错误"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 心跳处理
|
||||
*/
|
||||
private void handleHeartBeat(RoutingContext context) {
|
||||
// 1. 解析请求
|
||||
Camera3D11HeartBeatReqVO req = JsonUtils.parseObject(
|
||||
context.body().buffer().getBytes(), Camera3D11HeartBeatReqVO.class);
|
||||
if (req == null || req.getSn() == null) {
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("参数错误"));
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 校验设备存在
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, req.getSn());
|
||||
if (device == null) {
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.snNotFound());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 发送上线消息
|
||||
IotDeviceMessage onlineMsg = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(onlineMsg, productKey, req.getSn(), protocol.getServerId());
|
||||
|
||||
// 4. 构建心跳响应
|
||||
LocalDate today = LocalDate.now();
|
||||
long dataStartTime = today.atTime(LocalTime.of(8, 0)).atZone(ZONE_SHANGHAI).toEpochSecond();
|
||||
long dataEndTime = today.atTime(LocalTime.of(23, 0)).atZone(ZONE_SHANGHAI).toEpochSecond();
|
||||
long currentTime = LocalDateTime.now().atZone(ZONE_SHANGHAI).toEpochSecond();
|
||||
|
||||
Map<String, Object> data = MapUtil.newHashMap();
|
||||
data.put("sn", req.getSn());
|
||||
data.put("dataStartTime", dataStartTime);
|
||||
data.put("dataEndTime", dataEndTime);
|
||||
data.put("uploadInterval", DEFAULT_UPLOAD_INTERVAL);
|
||||
data.put("time", currentTime);
|
||||
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.success(data));
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据上报处理
|
||||
*/
|
||||
private void handleDataUpload(RoutingContext context) {
|
||||
// 1. 解析请求
|
||||
byte[] bytes = context.body().buffer().getBytes();
|
||||
Camera3D11DataUploadReqVO req = JsonUtils.parseObject(bytes, Camera3D11DataUploadReqVO.class);
|
||||
if (req == null || req.getSn() == null) {
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("参数错误"));
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 校验设备存在
|
||||
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, req.getSn());
|
||||
if (device == null) {
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.snNotFound());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 解码并发送消息
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes, IotCamera3D11Codec.TYPE);
|
||||
if (message == null) {
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("消息解码失败"));
|
||||
return;
|
||||
}
|
||||
deviceMessageService.sendDeviceMessage(message, productKey, req.getSn(), null);
|
||||
|
||||
// 4. 构建响应
|
||||
long currentTime = LocalDateTime.now().atZone(ZONE_SHANGHAI).toEpochSecond();
|
||||
Map<String, Object> data = MapUtil.newHashMap();
|
||||
data.put("sn", req.getSn());
|
||||
data.put("time", currentTime);
|
||||
|
||||
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.success(data));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -54,6 +54,7 @@ viewsh:
|
||||
http:
|
||||
enabled: true
|
||||
server-port: 8092
|
||||
camera3d11-product-key: camera_3d11 # 3D11 单目客流计数器的产品 Key
|
||||
# ====================================
|
||||
# 针对引入的 EMQX 组件的配置
|
||||
# ====================================
|
||||
|
||||
Reference in New Issue
Block a user