feat(iot): 实现设备 API 接口及更新网关处理逻辑
This commit is contained in:
@@ -1,388 +1,399 @@
|
||||
package com.viewsh.module.iot.gateway.protocol.tcp.router;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.handler.AuthResult;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.handler.ProtocolHandler;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import com.viewsh.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* TCP 上行消息处理器(重构版)
|
||||
* <p>
|
||||
* 职责:
|
||||
* 1. 管理 TCP 连接生命周期(连接建立、异常、关闭)
|
||||
* 2. 检测消息格式类型(JSON/Binary/JT808)
|
||||
* 3. 解码设备消息
|
||||
* 4. 路由到对应的协议处理器
|
||||
* 5. 管理设备认证状态
|
||||
* 6. 发送设备上线/离线消息
|
||||
* <p>
|
||||
* 设计原则:
|
||||
* - 主处理器只负责路由,不包含协议特定逻辑
|
||||
* - 协议处理器通过 Spring 自动注入,实现插件化
|
||||
* - 认证成功后,统一注册连接和发送上线消息
|
||||
* - 支持多步认证(如 JT808 的注册+鉴权)
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
|
||||
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
|
||||
private static final String CODEC_TYPE_JT808 = IotJt808DeviceMessageCodec.TYPE;
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
private final IotTcpConnectionManager connectionManager;
|
||||
private final List<ProtocolHandler> protocolHandlers;
|
||||
private final String serverId;
|
||||
|
||||
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol,
|
||||
IotDeviceMessageService deviceMessageService,
|
||||
IotTcpConnectionManager connectionManager,
|
||||
List<ProtocolHandler> protocolHandlers) {
|
||||
this.deviceMessageService = deviceMessageService;
|
||||
this.connectionManager = connectionManager;
|
||||
this.protocolHandlers = protocolHandlers;
|
||||
this.serverId = protocol.getServerId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(NetSocket socket) {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||
|
||||
// 设置异常和关闭处理器
|
||||
socket.exceptionHandler(ex -> {
|
||||
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress(), ex);
|
||||
cleanupConnection(socket);
|
||||
});
|
||||
|
||||
socket.closeHandler(v -> {
|
||||
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||
cleanupConnection(socket);
|
||||
});
|
||||
|
||||
// 设置消息处理器
|
||||
socket.handler(buffer -> {
|
||||
try {
|
||||
processMessage(clientId, buffer, socket);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
|
||||
clientId, socket.remoteAddress(), e.getMessage(), e);
|
||||
cleanupConnection(socket);
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
* <p>
|
||||
* 流程:
|
||||
* 1. 检测消息格式类型(JSON/Binary/JT808)
|
||||
* 2. 解码消息
|
||||
* 3. 查找协议处理器
|
||||
* 4. 判断是否为认证消息
|
||||
* 5. 路由到协议处理器处理
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param buffer 消息
|
||||
* @param socket 网络连接
|
||||
* @throws Exception 消息解码失败时抛出异常
|
||||
*/
|
||||
private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception {
|
||||
// 1. 基础检查
|
||||
if (buffer == null || buffer.length() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取消息格式类型
|
||||
String codecType = getMessageCodecType(buffer, socket);
|
||||
|
||||
// 3. 解码消息
|
||||
IotDeviceMessage message;
|
||||
try {
|
||||
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||
if (message == null) {
|
||||
throw new Exception("解码后消息为空");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 消息格式错误时抛出异常,由上层处理连接断开
|
||||
throw new Exception("消息解码失败: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
// 4. 查找协议处理器
|
||||
ProtocolHandler handler = findProtocolHandler(message, codecType);
|
||||
if (handler == null) {
|
||||
log.warn("[processMessage][未找到协议处理器,codecType: {}, method: {}]",
|
||||
codecType, message.getMethod());
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 判断是否为认证消息
|
||||
if (isAuthenticationMessage(message)) {
|
||||
handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler);
|
||||
} else {
|
||||
handleBusinessWithProtocol(clientId, message, codecType, socket, handler);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用协议处理器处理认证消息
|
||||
* <p>
|
||||
* 认证结果处理:
|
||||
* - SUCCESS:注册连接,发送上线消息
|
||||
* - PENDING:等待后续认证步骤(如 JT808 注册后等待鉴权)
|
||||
* - FAILURE:认证失败,不做处理(协议处理器已发送失败响应)
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param message 认证消息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param socket 网络连接
|
||||
* @param handler 协议处理器
|
||||
*/
|
||||
private void handleAuthenticationWithProtocol(String clientId, IotDeviceMessage message,
|
||||
String codecType, NetSocket socket,
|
||||
ProtocolHandler handler) {
|
||||
try {
|
||||
// 委托给协议处理器
|
||||
AuthResult result = handler.handleAuthentication(clientId, message, codecType, socket);
|
||||
|
||||
// 根据认证结果处理
|
||||
if (result.isSuccess()) {
|
||||
// 认证成功:注册连接并发送上线消息
|
||||
registerConnection(socket, result.getDevice(), clientId, codecType);
|
||||
sendOnlineMessage(result.getDevice());
|
||||
|
||||
log.info("[handleAuthentication][认证成功,设备: {}, 协议: {}]",
|
||||
result.getDevice().getDeviceName(), handler.getProtocolType());
|
||||
|
||||
} else if (result.isPending()) {
|
||||
// 认证待定:等待后续认证步骤(如 JT808 注册后等待鉴权)
|
||||
log.info("[handleAuthentication][认证待定,设备: {}, 协议: {}, 消息: {}]",
|
||||
result.getDevice() != null ? result.getDevice().getDeviceName() : "unknown",
|
||||
handler.getProtocolType(), result.getMessage());
|
||||
|
||||
} else {
|
||||
// 认证失败:协议处理器已发送失败响应,这里只记录日志
|
||||
log.warn("[handleAuthentication][认证失败,clientId: {}, 协议: {}, 原因: {}]",
|
||||
clientId, handler.getProtocolType(), result.getMessage());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handleAuthentication][认证异常,clientId: {}, 协议: {}]",
|
||||
clientId, handler.getProtocolType(), e);
|
||||
handler.sendResponse(socket, message, false, "认证异常", codecType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用协议处理器处理业务消息
|
||||
* <p>
|
||||
* 前置条件:设备已认证
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param message 业务消息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param socket 网络连接
|
||||
* @param handler 协议处理器
|
||||
*/
|
||||
private void handleBusinessWithProtocol(String clientId, IotDeviceMessage message,
|
||||
String codecType, NetSocket socket,
|
||||
ProtocolHandler handler) {
|
||||
try {
|
||||
// 1. 检查认证状态
|
||||
if (connectionManager.isNotAuthenticated(socket)) {
|
||||
log.warn("[handleBusinessMessage][设备未认证,clientId: {}]", clientId);
|
||||
handler.sendResponse(socket, message, false, "请先进行认证", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取连接信息
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo == null) {
|
||||
log.error("[handleBusinessMessage][连接信息不存在,clientId: {}]", clientId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 委托给协议处理器处理业务消息
|
||||
handler.handleBusinessMessage(
|
||||
clientId,
|
||||
message,
|
||||
codecType,
|
||||
socket,
|
||||
connectionInfo.getProductKey(),
|
||||
connectionInfo.getDeviceName(),
|
||||
serverId
|
||||
);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取消息编解码类型
|
||||
* <p>
|
||||
* 检测优先级:
|
||||
* 1. 如果已认证,使用缓存的编解码类型
|
||||
* 2. 未认证时,通过消息格式自动检测:
|
||||
* - JT808:首尾标识符 0x7e
|
||||
* - Binary:魔术字 0x7E
|
||||
* - JSON:默认
|
||||
*
|
||||
* @param buffer 消息
|
||||
* @param socket 网络连接
|
||||
* @return 消息编解码类型
|
||||
*/
|
||||
private String getMessageCodecType(Buffer buffer, NetSocket socket) {
|
||||
// 1. 如果已认证,优先使用缓存的编解码类型
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo != null && connectionInfo.isAuthenticated() &&
|
||||
StrUtil.isNotBlank(connectionInfo.getCodecType())) {
|
||||
return connectionInfo.getCodecType();
|
||||
}
|
||||
|
||||
// 2. 未认证时检测消息格式类型
|
||||
byte[] data = buffer.getBytes();
|
||||
|
||||
// 2.1 检测是否为 JT808 格式(首尾标识符 0x7e)
|
||||
if (IotJt808DeviceMessageCodec.isJt808Format(data)) {
|
||||
return CODEC_TYPE_JT808;
|
||||
}
|
||||
|
||||
// 2.2 检测是否为自定义二进制格式(魔术字 0x7E)
|
||||
if (IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(data)) {
|
||||
return CODEC_TYPE_BINARY;
|
||||
}
|
||||
|
||||
// 2.3 默认为 JSON 格式
|
||||
return CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
/**
|
||||
* 查找协议处理器
|
||||
* <p>
|
||||
* 遍历所有协议处理器,返回第一个能处理该消息的处理器
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param codecType 消息编解码类型
|
||||
* @return 协议处理器,未找到返回 null
|
||||
*/
|
||||
private ProtocolHandler findProtocolHandler(IotDeviceMessage message, String codecType) {
|
||||
return protocolHandlers.stream()
|
||||
.filter(handler -> handler.canHandle(message, codecType))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为认证消息
|
||||
* <p>
|
||||
* 认证消息包括:
|
||||
* - 标准认证:auth
|
||||
* - JT808 注册:jt808.terminal.register
|
||||
* - JT808 鉴权:jt808.terminal.auth
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @return true-是认证消息,false-不是
|
||||
*/
|
||||
private boolean isAuthenticationMessage(IotDeviceMessage message) {
|
||||
String method = message.getMethod();
|
||||
return "auth".equals(method)
|
||||
|| "jt808.terminal.register".equals(method)
|
||||
|| "jt808.terminal.auth".equals(method);
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册连接信息
|
||||
* <p>
|
||||
* 将设备连接信息注册到连接管理器,包括:
|
||||
* - 设备 ID
|
||||
* - 产品 Key
|
||||
* - 设备名称
|
||||
* - 客户端 ID
|
||||
* - 编解码类型
|
||||
* - 认证状态
|
||||
*
|
||||
* @param socket 网络连接
|
||||
* @param device 设备信息
|
||||
* @param clientId 客户端 ID
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
|
||||
String clientId, String codecType) {
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
|
||||
.setDeviceId(device.getId())
|
||||
.setProductKey(device.getProductKey())
|
||||
.setDeviceName(device.getDeviceName())
|
||||
.setClientId(clientId)
|
||||
.setCodecType(codecType)
|
||||
.setAuthenticated(true);
|
||||
|
||||
// 注册连接(如果设备已有其他连接,会自动断开旧连接)
|
||||
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送设备上线消息
|
||||
* <p>
|
||||
* 设备认证成功后,发送上线消息到消息总线,通知业务层设备已上线
|
||||
*
|
||||
* @param device 设备信息
|
||||
*/
|
||||
private void sendOnlineMessage(IotDeviceRespDTO device) {
|
||||
try {
|
||||
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
|
||||
device.getDeviceName(), serverId);
|
||||
|
||||
log.debug("[sendOnlineMessage][发送上线消息成功,设备: {}]", device.getDeviceName());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理连接
|
||||
* <p>
|
||||
* 连接关闭或异常时,清理连接信息并发送离线消息
|
||||
*
|
||||
* @param socket 网络连接
|
||||
*/
|
||||
private void cleanupConnection(NetSocket socket) {
|
||||
try {
|
||||
// 1. 发送离线消息(如果已认证)
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo != null && connectionInfo.isAuthenticated()) {
|
||||
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
|
||||
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
|
||||
connectionInfo.getDeviceName(), serverId);
|
||||
|
||||
log.debug("[cleanupConnection][发送离线消息成功,设备: {}]", connectionInfo.getDeviceName());
|
||||
}
|
||||
|
||||
// 2. 注销连接
|
||||
connectionManager.unregisterConnection(socket);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[cleanupConnection][清理连接失败]", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
package com.viewsh.module.iot.gateway.protocol.tcp.router;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.handler.AuthResult;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.handler.ProtocolHandler;
|
||||
import com.viewsh.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
|
||||
import com.viewsh.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* TCP 上行消息处理器(重构版)
|
||||
* <p>
|
||||
* 职责:
|
||||
* 1. 管理 TCP 连接生命周期(连接建立、异常、关闭)
|
||||
* 2. 检测消息格式类型(JSON/Binary/JT808)
|
||||
* 3. 解码设备消息
|
||||
* 4. 路由到对应的协议处理器
|
||||
* 5. 管理设备认证状态
|
||||
* 6. 发送设备上线/离线消息
|
||||
* <p>
|
||||
* 设计原则:
|
||||
* - 主处理器只负责路由,不包含协议特定逻辑
|
||||
* - 协议处理器通过 Spring 自动注入,实现插件化
|
||||
* - 认证成功后,统一注册连接和发送上线消息
|
||||
* - 支持多步认证(如 JT808 的注册+鉴权)
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
|
||||
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
|
||||
private static final String CODEC_TYPE_JT808 = IotJt808DeviceMessageCodec.TYPE;
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
private final IotTcpConnectionManager connectionManager;
|
||||
private final List<ProtocolHandler> protocolHandlers;
|
||||
private final String serverId;
|
||||
|
||||
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol,
|
||||
IotDeviceMessageService deviceMessageService,
|
||||
IotTcpConnectionManager connectionManager,
|
||||
List<ProtocolHandler> protocolHandlers) {
|
||||
this.deviceMessageService = deviceMessageService;
|
||||
this.connectionManager = connectionManager;
|
||||
this.protocolHandlers = protocolHandlers;
|
||||
this.serverId = protocol.getServerId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(NetSocket socket) {
|
||||
String clientId = IdUtil.simpleUUID();
|
||||
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||
|
||||
// 设置异常和关闭处理器
|
||||
socket.exceptionHandler(ex -> {
|
||||
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress(), ex);
|
||||
cleanupConnection(socket);
|
||||
});
|
||||
|
||||
socket.closeHandler(v -> {
|
||||
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
|
||||
cleanupConnection(socket);
|
||||
});
|
||||
|
||||
// 设置消息处理器
|
||||
socket.handler(buffer -> {
|
||||
try {
|
||||
processMessage(clientId, buffer, socket);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
|
||||
clientId, socket.remoteAddress(), e.getMessage(), e);
|
||||
cleanupConnection(socket);
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
* <p>
|
||||
* 流程:
|
||||
* 1. 检测消息格式类型(JSON/Binary/JT808)
|
||||
* 2. 解码消息
|
||||
* 3. 查找协议处理器
|
||||
* 4. 判断是否为认证消息
|
||||
* 5. 路由到协议处理器处理
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param buffer 消息
|
||||
* @param socket 网络连接
|
||||
* @throws Exception 消息解码失败时抛出异常
|
||||
*/
|
||||
private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception {
|
||||
// 1. 基础检查
|
||||
if (buffer == null || buffer.length() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取消息格式类型
|
||||
String codecType = getMessageCodecType(buffer, socket);
|
||||
if (codecType == null) {
|
||||
log.warn("[processMessage][未知消息格式,断开连接,clientId: {},数据开头: {}]",
|
||||
clientId, buffer.length() > 20 ? buffer.getString(0, 20) : buffer.toString());
|
||||
socket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 解码消息
|
||||
IotDeviceMessage message;
|
||||
try {
|
||||
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||
if (message == null) {
|
||||
throw new Exception("解码后消息为空");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 消息格式错误时抛出异常,由上层处理连接断开
|
||||
throw new Exception("消息解码失败: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
// 4. 查找协议处理器
|
||||
ProtocolHandler handler = findProtocolHandler(message, codecType);
|
||||
if (handler == null) {
|
||||
log.warn("[processMessage][未找到协议处理器,codecType: {}, method: {}]",
|
||||
codecType, message.getMethod());
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 判断是否为认证消息
|
||||
if (isAuthenticationMessage(message)) {
|
||||
handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler);
|
||||
} else {
|
||||
handleBusinessWithProtocol(clientId, message, codecType, socket, handler);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用协议处理器处理认证消息
|
||||
* <p>
|
||||
* 认证结果处理:
|
||||
* - SUCCESS:注册连接,发送上线消息
|
||||
* - PENDING:等待后续认证步骤(如 JT808 注册后等待鉴权)
|
||||
* - FAILURE:认证失败,不做处理(协议处理器已发送失败响应)
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param message 认证消息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param socket 网络连接
|
||||
* @param handler 协议处理器
|
||||
*/
|
||||
private void handleAuthenticationWithProtocol(String clientId, IotDeviceMessage message,
|
||||
String codecType, NetSocket socket,
|
||||
ProtocolHandler handler) {
|
||||
try {
|
||||
// 委托给协议处理器
|
||||
AuthResult result = handler.handleAuthentication(clientId, message, codecType, socket);
|
||||
|
||||
// 根据认证结果处理
|
||||
if (result.isSuccess()) {
|
||||
// 认证成功:注册连接并发送上线消息
|
||||
registerConnection(socket, result.getDevice(), clientId, codecType);
|
||||
sendOnlineMessage(result.getDevice());
|
||||
|
||||
log.info("[handleAuthentication][认证成功,设备: {}, 协议: {}]",
|
||||
result.getDevice().getDeviceName(), handler.getProtocolType());
|
||||
|
||||
} else if (result.isPending()) {
|
||||
// 认证待定:等待后续认证步骤(如 JT808 注册后等待鉴权)
|
||||
log.info("[handleAuthentication][认证待定,设备: {}, 协议: {}, 消息: {}]",
|
||||
result.getDevice() != null ? result.getDevice().getDeviceName() : "unknown",
|
||||
handler.getProtocolType(), result.getMessage());
|
||||
|
||||
} else {
|
||||
// 认证失败:协议处理器已发送失败响应,这里只记录日志
|
||||
log.warn("[handleAuthentication][认证失败,clientId: {}, 协议: {}, 原因: {}]",
|
||||
clientId, handler.getProtocolType(), result.getMessage());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handleAuthentication][认证异常,clientId: {}, 协议: {}]",
|
||||
clientId, handler.getProtocolType(), e);
|
||||
handler.sendResponse(socket, message, false, "认证异常", codecType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用协议处理器处理业务消息
|
||||
* <p>
|
||||
* 前置条件:设备已认证
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param message 业务消息
|
||||
* @param codecType 消息编解码类型
|
||||
* @param socket 网络连接
|
||||
* @param handler 协议处理器
|
||||
*/
|
||||
private void handleBusinessWithProtocol(String clientId, IotDeviceMessage message,
|
||||
String codecType, NetSocket socket,
|
||||
ProtocolHandler handler) {
|
||||
try {
|
||||
// 1. 检查认证状态
|
||||
if (connectionManager.isNotAuthenticated(socket)) {
|
||||
log.warn("[handleBusinessMessage][设备未认证,clientId: {}]", clientId);
|
||||
handler.sendResponse(socket, message, false, "请先进行认证", codecType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取连接信息
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo == null) {
|
||||
log.error("[handleBusinessMessage][连接信息不存在,clientId: {}]", clientId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 委托给协议处理器处理业务消息
|
||||
handler.handleBusinessMessage(
|
||||
clientId,
|
||||
message,
|
||||
codecType,
|
||||
socket,
|
||||
connectionInfo.getProductKey(),
|
||||
connectionInfo.getDeviceName(),
|
||||
serverId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取消息编解码类型
|
||||
* <p>
|
||||
* 检测优先级:
|
||||
* 1. 如果已认证,使用缓存的编解码类型
|
||||
* 2. 未认证时,通过消息格式自动检测:
|
||||
* - JT808:首尾标识符 0x7e
|
||||
* - Binary:魔术字 0x7E
|
||||
* - JSON:默认
|
||||
*
|
||||
* @param buffer 消息
|
||||
* @param socket 网络连接
|
||||
* @return 消息编解码类型
|
||||
*/
|
||||
private String getMessageCodecType(Buffer buffer, NetSocket socket) {
|
||||
// 1. 如果已认证,优先使用缓存的编解码类型
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo != null && connectionInfo.isAuthenticated() &&
|
||||
StrUtil.isNotBlank(connectionInfo.getCodecType())) {
|
||||
return connectionInfo.getCodecType();
|
||||
}
|
||||
|
||||
// 2. 未认证时检测消息格式类型
|
||||
byte[] data = buffer.getBytes();
|
||||
|
||||
// 2.1 检测是否为 JT808 格式(首尾标识符 0x7e)
|
||||
if (IotJt808DeviceMessageCodec.isJt808Format(data)) {
|
||||
return CODEC_TYPE_JT808;
|
||||
}
|
||||
|
||||
// 2.2 检测是否为自定义二进制格式(魔术字 0x7E)
|
||||
if (IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(data)) {
|
||||
return CODEC_TYPE_BINARY;
|
||||
}
|
||||
|
||||
// 2.3 检测是否为 JSON 格式(以 { 或 [ 开头)
|
||||
String jsonStr = StrUtil.utf8Str(data).trim();
|
||||
if (StrUtil.startWithAny(jsonStr, "{", "[")) {
|
||||
return CODEC_TYPE_JSON;
|
||||
}
|
||||
|
||||
// 2.4 未知格式
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 查找协议处理器
|
||||
* <p>
|
||||
* 遍历所有协议处理器,返回第一个能处理该消息的处理器
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param codecType 消息编解码类型
|
||||
* @return 协议处理器,未找到返回 null
|
||||
*/
|
||||
private ProtocolHandler findProtocolHandler(IotDeviceMessage message, String codecType) {
|
||||
return protocolHandlers.stream()
|
||||
.filter(handler -> handler.canHandle(message, codecType))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为认证消息
|
||||
* <p>
|
||||
* 认证消息包括:
|
||||
* - 标准认证:auth
|
||||
* - JT808 注册:jt808.terminal.register
|
||||
* - JT808 鉴权:jt808.terminal.auth
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @return true-是认证消息,false-不是
|
||||
*/
|
||||
private boolean isAuthenticationMessage(IotDeviceMessage message) {
|
||||
String method = message.getMethod();
|
||||
return "auth".equals(method)
|
||||
|| "jt808.terminal.register".equals(method)
|
||||
|| "jt808.terminal.auth".equals(method);
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册连接信息
|
||||
* <p>
|
||||
* 将设备连接信息注册到连接管理器,包括:
|
||||
* - 设备 ID
|
||||
* - 产品 Key
|
||||
* - 设备名称
|
||||
* - 客户端 ID
|
||||
* - 编解码类型
|
||||
* - 认证状态
|
||||
*
|
||||
* @param socket 网络连接
|
||||
* @param device 设备信息
|
||||
* @param clientId 客户端 ID
|
||||
* @param codecType 消息编解码类型
|
||||
*/
|
||||
private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
|
||||
String clientId, String codecType) {
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
|
||||
.setDeviceId(device.getId())
|
||||
.setProductKey(device.getProductKey())
|
||||
.setDeviceName(device.getDeviceName())
|
||||
.setClientId(clientId)
|
||||
.setCodecType(codecType)
|
||||
.setAuthenticated(true);
|
||||
|
||||
// 注册连接(如果设备已有其他连接,会自动断开旧连接)
|
||||
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送设备上线消息
|
||||
* <p>
|
||||
* 设备认证成功后,发送上线消息到消息总线,通知业务层设备已上线
|
||||
*
|
||||
* @param device 设备信息
|
||||
*/
|
||||
private void sendOnlineMessage(IotDeviceRespDTO device) {
|
||||
try {
|
||||
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
|
||||
device.getDeviceName(), serverId);
|
||||
|
||||
log.debug("[sendOnlineMessage][发送上线消息成功,设备: {}]", device.getDeviceName());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理连接
|
||||
* <p>
|
||||
* 连接关闭或异常时,清理连接信息并发送离线消息
|
||||
*
|
||||
* @param socket 网络连接
|
||||
*/
|
||||
private void cleanupConnection(NetSocket socket) {
|
||||
try {
|
||||
// 1. 发送离线消息(如果已认证)
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo != null && connectionInfo.isAuthenticated()) {
|
||||
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
|
||||
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
|
||||
connectionInfo.getDeviceName(), serverId);
|
||||
|
||||
log.debug("[cleanupConnection][发送离线消息成功,设备: {}]", connectionInfo.getDeviceName());
|
||||
}
|
||||
|
||||
// 2. 注销连接
|
||||
connectionManager.unregisterConnection(socket);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[cleanupConnection][清理连接失败]", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user