feat(iot): 对接 3D11 单目客流计数器

在 IoT Gateway 的 Vert.x Router 上注册 /api/camera/* 专用路由,
桥接 3D11 摄像头的心跳和数据上报到现有消息总线和编解码体系。

- 新建 Camera3D11 DTO(心跳请求、数据上报请求、统一响应)
- 新建 IotCamera3D11Codec 编解码器(TYPE=CAMERA_3D11)
- 新建 IotCameraUpstreamHandler 处理心跳和数据上报
- productKey 通过 application.yaml 配置,未配置时不注册路由
- 心跳上报间隔设为 1 分钟

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-02-08 00:23:44 +08:00
parent db5266d306
commit cc6b11f4e9
8 changed files with 827 additions and 491 deletions

View File

@@ -0,0 +1,68 @@
package com.viewsh.module.iot.gateway.codec.camera3d11;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec;
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11DataUploadReqVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
/**
* 3D11 单目客流计数器 编解码器
*/
@Slf4j
@Component
public class IotCamera3D11Codec implements IotDeviceMessageCodec {
public static final String TYPE = "CAMERA_3D11";
@Override
public String type() {
return TYPE;
}
@Override
public IotDeviceMessage decode(byte[] bytes) {
// 1. 解析 JSON
Camera3D11DataUploadReqVO payload;
try {
payload = JsonUtils.parseObject(bytes, Camera3D11DataUploadReqVO.class);
} catch (Exception e) {
log.error("[decode][解析 3D11 数据失败]", e);
throw new IllegalArgumentException("JSON 格式错误");
}
Assert.notNull(payload, "消息内容不能为空");
// 2. 构建属性参数
Map<String, Object> params = MapUtil.newHashMap();
params.put("people_in", payload.getPeopleIn());
params.put("people_out", payload.getPeopleOut());
params.put("stat_start_time", payload.getStartTime());
params.put("stat_end_time", payload.getEndTime());
// 3. 使用 endTime 作为上报时间
LocalDateTime reportTime = LocalDateTime.now();
if (payload.getEndTime() != null) {
reportTime = LocalDateTime.ofInstant(
Instant.ofEpochSecond(payload.getEndTime()), ZoneId.systemDefault());
}
return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params)
.setReportTime(reportTime);
}
@Override
public byte[] encode(IotDeviceMessage message) {
// 3D11 不需要下行指令
return new byte[0];
}
}

View File

@@ -0,0 +1,44 @@
package com.viewsh.module.iot.gateway.codec.camera3d11.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* 3D11 单目客流计数器 — 数据上报请求
*/
@Data
public class Camera3D11DataUploadReqVO {
/**
* 设备序列号
*/
private String sn;
/**
* 统计开始时间Unix 秒)
*/
private Long startTime;
/**
* 统计结束时间Unix 秒)
*/
private Long endTime;
/**
* 上传时间Unix 秒)
*/
private Long time;
/**
* 进客流
*/
@JsonProperty("in")
private Integer peopleIn;
/**
* 出客流
*/
@JsonProperty("out")
private Integer peopleOut;
}

View File

@@ -0,0 +1,21 @@
package com.viewsh.module.iot.gateway.codec.camera3d11.dto;
import lombok.Data;
/**
* 3D11 单目客流计数器 — 心跳请求
*/
@Data
public class Camera3D11HeartBeatReqVO {
/**
* 设备序列号
*/
private String sn;
/**
* Unix 时间戳(秒)
*/
private Long time;
}

View File

@@ -0,0 +1,36 @@
package com.viewsh.module.iot.gateway.codec.camera3d11.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 3D11 单目客流计数器 — 统一响应
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Camera3D11Resp {
/**
* 响应码0=成功 1=SN不存在 2=失败
*/
private Integer code;
private String msg;
private Object data;
public static Camera3D11Resp success(Object data) {
return new Camera3D11Resp(0, "success", data);
}
public static Camera3D11Resp snNotFound() {
return new Camera3D11Resp(1, "sn不存在", null);
}
public static Camera3D11Resp error(String msg) {
return new Camera3D11Resp(2, msg, null);
}
}

View File

@@ -1,405 +1,412 @@
package com.viewsh.module.iot.gateway.config; package com.viewsh.module.iot.gateway.config;
import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import lombok.Data; import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
@ConfigurationProperties(prefix = "viewsh.iot.gateway") @ConfigurationProperties(prefix = "viewsh.iot.gateway")
@Validated @Validated
@Data @Data
public class IotGatewayProperties { public class IotGatewayProperties {
/** /**
* 设备 RPC 服务配置 * 设备 RPC 服务配置
*/ */
private RpcProperties rpc; private RpcProperties rpc;
/** /**
* Token 配置 * Token 配置
*/ */
private TokenProperties token; private TokenProperties token;
/** /**
* 协议配置 * 协议配置
*/ */
private ProtocolProperties protocol; private ProtocolProperties protocol;
@Data @Data
public static class RpcProperties { public static class RpcProperties {
/** /**
* 主程序 API 地址 * 主程序 API 地址
*/ */
@NotEmpty(message = "主程序 API 地址不能为空") @NotEmpty(message = "主程序 API 地址不能为空")
private String url; private String url;
/** /**
* 连接超时时间 * 连接超时时间
*/ */
@NotNull(message = "连接超时时间不能为空") @NotNull(message = "连接超时时间不能为空")
private Duration connectTimeout; private Duration connectTimeout;
/** /**
* 读取超时时间 * 读取超时时间
*/ */
@NotNull(message = "读取超时时间不能为空") @NotNull(message = "读取超时时间不能为空")
private Duration readTimeout; private Duration readTimeout;
} }
@Data @Data
public static class TokenProperties { public static class TokenProperties {
/** /**
* 密钥 * 密钥
*/ */
@NotEmpty(message = "密钥不能为空") @NotEmpty(message = "密钥不能为空")
private String secret; private String secret;
/** /**
* 令牌有效期 * 令牌有效期
*/ */
@NotNull(message = "令牌有效期不能为空") @NotNull(message = "令牌有效期不能为空")
private Duration expiration; private Duration expiration;
} }
@Data @Data
public static class ProtocolProperties { public static class ProtocolProperties {
/** /**
* HTTP 组件配置 * HTTP 组件配置
*/ */
private HttpProperties http; private HttpProperties http;
/** /**
* EMQX 组件配置 * EMQX 组件配置
*/ */
private EmqxProperties emqx; private EmqxProperties emqx;
/** /**
* TCP 组件配置 * TCP 组件配置
*/ */
private TcpProperties tcp; private TcpProperties tcp;
/** /**
* MQTT 组件配置 * MQTT 组件配置
*/ */
private MqttProperties mqtt; private MqttProperties mqtt;
} }
@Data @Data
public static class HttpProperties { public static class HttpProperties {
/** /**
* 是否开启 * 是否开启
*/ */
@NotNull(message = "是否开启不能为空") @NotNull(message = "是否开启不能为空")
private Boolean enabled; private Boolean enabled;
/** /**
* 服务端口 * 服务端口
*/ */
private Integer serverPort; private Integer serverPort;
/** /**
* 是否开启 SSL * 是否开启 SSL
*/ */
@NotNull(message = "是否开启 SSL 不能为空") @NotNull(message = "是否开启 SSL 不能为空")
private Boolean sslEnabled = false; private Boolean sslEnabled = false;
/** /**
* SSL 证书路径 * SSL 证书路径
*/ */
private String sslKeyPath; private String sslKeyPath;
/** /**
* SSL 证书路径 * SSL 证书路径
*/ */
private String sslCertPath; private String sslCertPath;
} /**
* 3D11 单目客流计数器的产品 Key
@Data *
public static class EmqxProperties { * 设备通过 /api/camera/* 路径上报,需要配置此值用于设备查找
*/
/** private String camera3d11ProductKey;
* 是否开启
*/ }
@NotNull(message = "是否开启不能为空")
private Boolean enabled; @Data
public static class EmqxProperties {
/**
* HTTP 服务端口默认8090 /**
*/ * 是否开启
private Integer httpPort = 8090; */
@NotNull(message = "是否开启不能为空")
/** private Boolean enabled;
* MQTT 服务器地址
*/ /**
@NotEmpty(message = "MQTT 服务器地址不能为空") * HTTP 服务端口默认8090
private String mqttHost; */
private Integer httpPort = 8090;
/**
* MQTT 服务器端口默认1883 /**
*/ * MQTT 服务器地址
@NotNull(message = "MQTT 服务器端口不能为空") */
private Integer mqttPort = 1883; @NotEmpty(message = "MQTT 服务器地址不能为空")
private String mqttHost;
/**
* MQTT 用户名 /**
*/ * MQTT 服务器端口默认1883
@NotEmpty(message = "MQTT 用户名不能为空") */
private String mqttUsername; @NotNull(message = "MQTT 服务器端口不能为空")
private Integer mqttPort = 1883;
/**
* MQTT 密码 /**
*/ * MQTT 用户名
@NotEmpty(message = "MQTT 密码不能为空") */
private String mqttPassword; @NotEmpty(message = "MQTT 用户名不能为空")
private String mqttUsername;
/**
* MQTT 客户端的 SSL 开关 /**
*/ * MQTT 密码
@NotNull(message = "MQTT 是否开启 SSL 不能为空") */
private Boolean mqttSsl = false; @NotEmpty(message = "MQTT 密码不能为空")
private String mqttPassword;
/**
* MQTT 客户端 ID如果为空系统将自动生成 /**
*/ * MQTT 客户端的 SSL 开关
@NotEmpty(message = "MQTT 客户端 ID 不能为空") */
private String mqttClientId; @NotNull(message = "MQTT 是否开启 SSL 不能为空")
private Boolean mqttSsl = false;
/**
* MQTT 订阅的主题 /**
*/ * MQTT 客户端 ID如果为空系统将自动生成
@NotEmpty(message = "MQTT 主题不能为空") */
private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics; @NotEmpty(message = "MQTT 客户端 ID 不能为空")
private String mqttClientId;
/**
* 默认 QoS 级别 /**
* <p> * MQTT 订阅的主题
* 0 - 最多一次 */
* 1 - 至少一次 @NotEmpty(message = "MQTT 主题不能为空")
* 2 - 刚好一次 private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics;
*/
private Integer mqttQos = 1; /**
* 默认 QoS 级别
/** * <p>
* 连接超时时间(秒) * 0 - 最多一次
*/ * 1 - 至少一次
private Integer connectTimeoutSeconds = 10; * 2 - 刚好一次
*/
/** private Integer mqttQos = 1;
* 重连延迟时间(毫秒)
*/ /**
private Long reconnectDelayMs = 5000L; * 连接超时时间(秒)
*/
/** private Integer connectTimeoutSeconds = 10;
* 是否启用 Clean Session (清理会话)
* true: 每次连接都是新会话Broker 不保留离线消息和订阅关系。 /**
* 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。 * 重连延迟时间(毫秒)
*/ */
private Boolean cleanSession = true; private Long reconnectDelayMs = 5000L;
/** /**
* 心跳间隔(秒) * 是否启用 Clean Session (清理会话)
* 用于保持连接活性,及时发现网络中断 * true: 每次连接都是新会话Broker 不保留离线消息和订阅关系
*/ * 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。
private Integer keepAliveIntervalSeconds = 60; */
private Boolean cleanSession = true;
/**
* 最大未确认消息队列大小 /**
* 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。 * 心跳间隔(秒)
*/ * 用于保持连接活性,及时发现网络中断。
private Integer maxInflightQueue = 10000; */
private Integer keepAliveIntervalSeconds = 60;
/**
* 是否信任所有 SSL 证书 /**
* 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用! * 最大未确认消息队列大小
* 在生产环境中,应设置为 false并配置正确的信任库 * 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制
*/ */
private Boolean trustAll = false; private Integer maxInflightQueue = 10000;
/** /**
* 遗嘱消息配置 (用于网关异常下线时通知其他系统) * 是否信任所有 SSL 证书
*/ * 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用!
private final Will will = new Will(); * 在生产环境中,应设置为 false并配置正确的信任库。
*/
/** private Boolean trustAll = false;
* 高级 SSL/TLS 配置 (用于生产环境)
*/ /**
private final Ssl sslOptions = new Ssl(); * 遗嘱消息配置 (用于网关异常下线时通知其他系统)
*/
/** private final Will will = new Will();
* 遗嘱消息 (Last Will and Testament)
*/ /**
@Data * 高级 SSL/TLS 配置 (用于生产环境)
public static class Will { */
private final Ssl sslOptions = new Ssl();
/**
* 是否启用遗嘱消息 /**
*/ * 遗嘱消息 (Last Will and Testament)
private boolean enabled = false; */
/** @Data
* 遗嘱消息主题 public static class Will {
*/
private String topic; /**
/** * 是否启用遗嘱消息
* 遗嘱消息内容 */
*/ private boolean enabled = false;
private String payload; /**
/** * 遗嘱消息主题
* 遗嘱消息 QoS 等级 */
*/ private String topic;
private Integer qos = 1; /**
/** * 遗嘱消息内容
* 遗嘱消息是否作为保留消息发布 */
*/ private String payload;
private boolean retain = true; /**
* 遗嘱消息 QoS 等级
} */
private Integer qos = 1;
/** /**
* 高级 SSL/TLS 配置 * 遗嘱消息是否作为保留消息发布
*/ */
@Data private boolean retain = true;
public static class Ssl {
}
/**
* 密钥库KeyStore路径例如classpath:certs/client.jks /**
* 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。 * 高级 SSL/TLS 配置
*/ */
private String keyStorePath; @Data
/** public static class Ssl {
* 密钥库密码
*/ /**
private String keyStorePassword; * 密钥库KeyStore路径例如classpath:certs/client.jks
/** * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。
* 信任库TrustStore路径例如classpath:certs/trust.jks */
* 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。 private String keyStorePath;
*/ /**
private String trustStorePath; * 密钥库密码
/** */
* 信任库密码 private String keyStorePassword;
*/ /**
private String trustStorePassword; * 信任库TrustStore路径例如classpath:certs/trust.jks
* 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。
} */
private String trustStorePath;
} /**
* 信任库密码
@Data */
public static class TcpProperties { private String trustStorePassword;
/** }
* 是否开启
*/ }
@NotNull(message = "是否开启不能为空")
private Boolean enabled; @Data
public static class TcpProperties {
/**
* 服务器端口 /**
*/ * 是否开启
private Integer port = 8091; */
@NotNull(message = "是否开启不能为空")
/** private Boolean enabled;
* 心跳超时时间(毫秒)
*/ /**
private Long keepAliveTimeoutMs = 30000L; * 服务器端口
*/
/** private Integer port = 8091;
* 最大连接数
*/ /**
private Integer maxConnections = 1000; * 心跳超时时间(毫秒)
*/
/** private Long keepAliveTimeoutMs = 30000L;
* 是否启用SSL
*/ /**
private Boolean sslEnabled = false; * 最大连接数
*/
/** private Integer maxConnections = 1000;
* SSL证书路径
*/ /**
private String sslCertPath; * 是否启用SSL
*/
/** private Boolean sslEnabled = false;
* SSL私钥路径
*/ /**
private String sslKeyPath; * SSL证书路径
*/
} private String sslCertPath;
@Data /**
public static class MqttProperties { * SSL私钥路径
*/
/** private String sslKeyPath;
* 是否开启
*/ }
@NotNull(message = "是否开启不能为空")
private Boolean enabled; @Data
public static class MqttProperties {
/**
* 服务器端口 /**
*/ * 是否开启
private Integer port = 1883; */
@NotNull(message = "是否开启不能为空")
/** private Boolean enabled;
* 最大消息大小(字节)
*/ /**
private Integer maxMessageSize = 8192; * 服务器端口
*/
/** private Integer port = 1883;
* 连接超时时间(秒)
*/ /**
private Integer connectTimeoutSeconds = 60; * 最大消息大小(字节)
/** */
* 保持连接超时时间(秒) private Integer maxMessageSize = 8192;
*/
private Integer keepAliveTimeoutSeconds = 300; /**
* 连接超时时间(秒)
/** */
* 是否启用 SSL private Integer connectTimeoutSeconds = 60;
*/ /**
private Boolean sslEnabled = false; * 保持连接超时时间(秒)
/** */
* SSL 配置 private Integer keepAliveTimeoutSeconds = 300;
*/
private SslOptions sslOptions = new SslOptions(); /**
* 是否启用 SSL
/** */
* SSL 配置选项 private Boolean sslEnabled = false;
*/ /**
@Data * SSL 配置
public static class SslOptions { */
private SslOptions sslOptions = new SslOptions();
/**
* 密钥证书选项 /**
*/ * SSL 配置选项
private io.vertx.core.net.KeyCertOptions keyCertOptions; */
/** @Data
* 信任选项 public static class SslOptions {
*/
private io.vertx.core.net.TrustOptions trustOptions; /**
/** * 密钥证书选项
* SSL 证书路径 */
*/ private io.vertx.core.net.KeyCertOptions keyCertOptions;
private String certPath; /**
/** * 信任选项
* SSL 私钥路径 */
*/ private io.vertx.core.net.TrustOptions trustOptions;
private String keyPath; /**
/** * SSL 证书路径
* 信任存储路径 */
*/ private String certPath;
private String trustStorePath; /**
/** * SSL 私钥路径
* 信任存储密码 */
*/ private String keyPath;
private String trustStorePassword; /**
* 信任存储路径
} */
private String trustStorePath;
} /**
* 信任存储密码
} */
private String trustStorePassword;
}
}
}

View File

@@ -1,86 +1,95 @@
package com.viewsh.module.iot.gateway.protocol.http; package com.viewsh.module.iot.gateway.protocol.http;
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils; import cn.hutool.core.util.StrUtil;
import com.viewsh.module.iot.gateway.config.IotGatewayProperties; import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpAuthHandler; import com.viewsh.module.iot.gateway.config.IotGatewayProperties;
import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler; import com.viewsh.module.iot.gateway.protocol.http.router.IotCameraUpstreamHandler;
import io.vertx.core.AbstractVerticle; import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpAuthHandler;
import io.vertx.core.Vertx; import com.viewsh.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler;
import io.vertx.core.http.HttpServer; import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServerOptions; import io.vertx.core.Vertx;
import io.vertx.core.net.PemKeyCertOptions; import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router; import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.handler.BodyHandler; import io.vertx.core.net.PemKeyCertOptions;
import jakarta.annotation.PostConstruct; import io.vertx.ext.web.Router;
import jakarta.annotation.PreDestroy; import io.vertx.ext.web.handler.BodyHandler;
import lombok.Getter; import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import jakarta.annotation.PreDestroy;
import lombok.Getter;
/** import lombok.extern.slf4j.Slf4j;
* IoT 网关 HTTP 协议:接收设备上行消息
* /**
* @author 芋道源码 * IoT 网关 HTTP 协议:接收设备上行消息
*/ *
@Slf4j * @author 芋道源码
public class IotHttpUpstreamProtocol extends AbstractVerticle { */
@Slf4j
private final IotGatewayProperties.HttpProperties httpProperties; public class IotHttpUpstreamProtocol extends AbstractVerticle {
private HttpServer httpServer; private final IotGatewayProperties.HttpProperties httpProperties;
@Getter private HttpServer httpServer;
private final String serverId;
@Getter
public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) { private final String serverId;
this.httpProperties = httpProperties;
this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort()); public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties) {
} this.httpProperties = httpProperties;
this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort());
@Override }
@PostConstruct
public void start() { @Override
// 创建路由 @PostConstruct
Vertx vertx = Vertx.vertx(); public void start() {
Router router = Router.router(vertx); // 创建路由
router.route().handler(BodyHandler.create()); Vertx vertx = Vertx.vertx();
Router router = Router.router(vertx);
// 创建处理器,添加路由处理器 router.route().handler(BodyHandler.create());
IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this);
router.post(IotHttpAuthHandler.PATH).handler(authHandler); // 创建处理器,添加路由处理器
IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this); IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this);
router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler); router.post(IotHttpAuthHandler.PATH).handler(authHandler);
IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this);
// 启动 HTTP 服务器 router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler);
HttpServerOptions options = new HttpServerOptions() // 3D11 单目客流计数器专用路由(仅在配置了 productKey 时启用)
.setPort(httpProperties.getServerPort()); String camera3d11ProductKey = httpProperties.getCamera3d11ProductKey();
if (Boolean.TRUE.equals(httpProperties.getSslEnabled())) { if (StrUtil.isNotBlank(camera3d11ProductKey)) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions().setKeyPath(httpProperties.getSslKeyPath()) IotCameraUpstreamHandler cameraHandler = new IotCameraUpstreamHandler(this, camera3d11ProductKey);
.setCertPath(httpProperties.getSslCertPath()); router.post(IotCameraUpstreamHandler.PATH_HEARTBEAT).handler(cameraHandler);
options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); router.post(IotCameraUpstreamHandler.PATH_DATA_UPLOAD).handler(cameraHandler);
} }
try {
httpServer = vertx.createHttpServer(options) // 启动 HTTP 服务器
.requestHandler(router) HttpServerOptions options = new HttpServerOptions()
.listen() .setPort(httpProperties.getServerPort());
.result(); if (Boolean.TRUE.equals(httpProperties.getSslEnabled())) {
log.info("[start][IoT 网关 HTTP 协议启动成功,端口:{}]", httpProperties.getServerPort()); PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions().setKeyPath(httpProperties.getSslKeyPath())
} catch (Exception e) { .setCertPath(httpProperties.getSslCertPath());
log.error("[start][IoT 网关 HTTP 协议启动失败]", e); options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
throw e; }
} try {
} httpServer = vertx.createHttpServer(options)
.requestHandler(router)
@Override .listen()
@PreDestroy .result();
public void stop() { log.info("[start][IoT 网关 HTTP 协议启动成功,端口:{}]", httpProperties.getServerPort());
if (httpServer != null) { } catch (Exception e) {
try { log.error("[start][IoT 网关 HTTP 协议启动失败]", e);
httpServer.close().result(); throw e;
log.info("[stop][IoT 网关 HTTP 协议已停止]"); }
} catch (Exception e) { }
log.error("[stop][IoT 网关 HTTP 协议停止失败]", e);
} @Override
} @PreDestroy
} public void stop() {
if (httpServer != null) {
} try {
httpServer.close().result();
log.info("[stop][IoT 网关 HTTP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 HTTP 协议停止失败]", e);
}
}
}
}

View File

@@ -0,0 +1,150 @@
package com.viewsh.module.iot.gateway.protocol.http.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.gateway.codec.camera3d11.IotCamera3D11Codec;
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11DataUploadReqVO;
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11HeartBeatReqVO;
import com.viewsh.module.iot.gateway.codec.camera3d11.dto.Camera3D11Resp;
import com.viewsh.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import com.viewsh.module.iot.gateway.service.device.IotDeviceService;
import com.viewsh.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Map;
/**
* 3D11 单目客流计数器 — HTTP 上行处理器
* <p>
* 直接实现 {@link Handler},不继承 {@link IotHttpAbstractHandler}
* 因其认证逻辑依赖路径参数 (productKey/deviceName),而 3D11 使用固定路径 + body 中 sn 识别设备。
*/
@Slf4j
public class IotCameraUpstreamHandler implements Handler<RoutingContext> {
public static final String PATH_HEARTBEAT = "/api/camera/heartBeat";
public static final String PATH_DATA_UPLOAD = "/api/camera/dataUpload";
/**
* 心跳响应默认上报间隔(分钟)
*/
private static final int DEFAULT_UPLOAD_INTERVAL = 1;
private static final ZoneId ZONE_SHANGHAI = ZoneId.of("Asia/Shanghai");
private final IotHttpUpstreamProtocol protocol;
private final String productKey;
private final IotDeviceService deviceService;
private final IotDeviceMessageService deviceMessageService;
public IotCameraUpstreamHandler(IotHttpUpstreamProtocol protocol, String productKey) {
this.protocol = protocol;
this.productKey = productKey;
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
}
@Override
public void handle(RoutingContext context) {
try {
String path = context.request().path();
if (PATH_HEARTBEAT.equals(path)) {
handleHeartBeat(context);
} else if (PATH_DATA_UPLOAD.equals(path)) {
handleDataUpload(context);
} else {
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("未知路径"));
}
} catch (Exception e) {
log.error("[handle][3D11 处理异常, path={}]", context.request().path(), e);
if (!context.response().ended()) {
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("服务器内部错误"));
}
}
}
/**
* 心跳处理
*/
private void handleHeartBeat(RoutingContext context) {
// 1. 解析请求
Camera3D11HeartBeatReqVO req = JsonUtils.parseObject(
context.body().buffer().getBytes(), Camera3D11HeartBeatReqVO.class);
if (req == null || req.getSn() == null) {
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("参数错误"));
return;
}
// 2. 校验设备存在
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, req.getSn());
if (device == null) {
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.snNotFound());
return;
}
// 3. 发送上线消息
IotDeviceMessage onlineMsg = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMsg, productKey, req.getSn(), protocol.getServerId());
// 4. 构建心跳响应
LocalDate today = LocalDate.now();
long dataStartTime = today.atTime(LocalTime.of(8, 0)).atZone(ZONE_SHANGHAI).toEpochSecond();
long dataEndTime = today.atTime(LocalTime.of(23, 0)).atZone(ZONE_SHANGHAI).toEpochSecond();
long currentTime = LocalDateTime.now().atZone(ZONE_SHANGHAI).toEpochSecond();
Map<String, Object> data = MapUtil.newHashMap();
data.put("sn", req.getSn());
data.put("dataStartTime", dataStartTime);
data.put("dataEndTime", dataEndTime);
data.put("uploadInterval", DEFAULT_UPLOAD_INTERVAL);
data.put("time", currentTime);
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.success(data));
}
/**
* 数据上报处理
*/
private void handleDataUpload(RoutingContext context) {
// 1. 解析请求
byte[] bytes = context.body().buffer().getBytes();
Camera3D11DataUploadReqVO req = JsonUtils.parseObject(bytes, Camera3D11DataUploadReqVO.class);
if (req == null || req.getSn() == null) {
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("参数错误"));
return;
}
// 2. 校验设备存在
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, req.getSn());
if (device == null) {
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.snNotFound());
return;
}
// 3. 解码并发送消息
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes, IotCamera3D11Codec.TYPE);
if (message == null) {
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.error("消息解码失败"));
return;
}
deviceMessageService.sendDeviceMessage(message, productKey, req.getSn(), null);
// 4. 构建响应
long currentTime = LocalDateTime.now().atZone(ZONE_SHANGHAI).toEpochSecond();
Map<String, Object> data = MapUtil.newHashMap();
data.put("sn", req.getSn());
data.put("time", currentTime);
IotHttpAbstractHandler.writeResponse(context, Camera3D11Resp.success(data));
}
}

View File

@@ -54,6 +54,7 @@ viewsh:
http: http:
enabled: true enabled: true
server-port: 8092 server-port: 8092
camera3d11-product-key: camera_3d11 # 3D11 单目客流计数器的产品 Key
# ==================================== # ====================================
# 针对引入的 EMQX 组件的配置 # 针对引入的 EMQX 组件的配置
# ==================================== # ====================================