chore: JT808/2013协议解析融合到现有TCP连接体系,对这个协议的注册鉴权做了定制化改造(注册返回鉴权码(返回phone),鉴权凭借鉴权码)

This commit is contained in:
lzh
2025-12-26 14:26:30 +08:00
parent 1886afc150
commit d1d7eec5c9
9 changed files with 1340 additions and 223 deletions

View File

@@ -12,10 +12,13 @@ import com.viewshanghai.module.iot.gateway.protocol.mqtt.manager.IotMqttConnecti
import com.viewshanghai.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import com.viewshanghai.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import com.viewshanghai.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.ProtocolHandler;
import com.viewshanghai.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService;
import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -97,9 +100,10 @@ public class IotGatewayConfiguration {
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager,
List<ProtocolHandler> protocolHandlers,
Vertx tcpVertx) {
return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(),
deviceService, messageService, connectionManager, tcpVertx);
deviceService, messageService, connectionManager, protocolHandlers, tcpVertx);
}
@Bean

View File

@@ -2,6 +2,7 @@ package com.viewshanghai.module.iot.gateway.protocol.tcp;
import com.viewshanghai.module.iot.core.util.IotDeviceMessageUtils;
import com.viewshanghai.module.iot.gateway.config.IotGatewayProperties;
import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.ProtocolHandler;
import com.viewshanghai.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import com.viewshanghai.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler;
import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService;
@@ -15,6 +16,8 @@ import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* IoT 网关 TCP 协议:接收设备上行消息
*
@@ -31,6 +34,8 @@ public class IotTcpUpstreamProtocol {
private final IotTcpConnectionManager connectionManager;
private final List<ProtocolHandler> protocolHandlers;
private final Vertx vertx;
@Getter
@@ -42,11 +47,13 @@ public class IotTcpUpstreamProtocol {
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager,
List<ProtocolHandler> protocolHandlers,
Vertx vertx) {
this.tcpProperties = tcpProperties;
this.deviceService = deviceService;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.protocolHandlers = protocolHandlers;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort());
}
@@ -70,8 +77,8 @@ public class IotTcpUpstreamProtocol {
// 创建服务器并设置连接处理器
tcpServer = vertx.createNetServer(options);
tcpServer.connectHandler(socket -> {
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService,
connectionManager);
IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService,
connectionManager, protocolHandlers);
handler.handle(socket);
});

View File

@@ -0,0 +1,127 @@
package com.viewshanghai.module.iot.gateway.protocol.tcp.handler;
import cn.hutool.core.map.MapUtil;
import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage;
import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.extern.slf4j.Slf4j;
/**
* 协议处理器抽象基类
* <p>
* 提供协议处理器的通用功能实现:
* 1. 消息编解码服务注入
* 2. 通用响应发送方法
* 3. 日志记录
* <p>
* 子类只需实现协议特定的认证和业务逻辑即可。
*
* @author lzh
*/
@Slf4j
public abstract class AbstractProtocolHandler implements ProtocolHandler {
/**
* 标准认证方法名
*/
protected static final String AUTH_METHOD = "auth";
/**
* 设备消息服务(用于编解码)
*/
protected final IotDeviceMessageService deviceMessageService;
protected AbstractProtocolHandler(IotDeviceMessageService deviceMessageService) {
this.deviceMessageService = deviceMessageService;
}
/**
* 默认响应发送实现
* <p>
* 发送标准格式的响应消息:
* {
* "success": true/false,
* "message": "消息内容"
* }
* <p>
* 子类可以重写此方法以实现协议特定的响应格式。
*
* @param socket 网络连接
* @param originalMessage 原始请求消息
* @param success 是否成功
* @param message 响应消息
* @param codecType 消息编解码类型
*/
@Override
public void sendResponse(NetSocket socket, IotDeviceMessage originalMessage,
boolean success, String message, String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.<String, Object>builder()
.put("success", success)
.put("message", message)
.build();
// 构建响应消息
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(
originalMessage.getRequestId(),
originalMessage.getMethod(),
responseData,
code,
message
);
// 编码并发送
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
log.debug("[sendResponse][发送响应成功,协议: {}, success: {}, message: {}]",
getProtocolType(), success, message);
} catch (Exception e) {
log.error("[sendResponse][发送响应失败,协议: {}, requestId: {}]",
getProtocolType(), originalMessage.getRequestId(), e);
}
}
/**
* 发送成功响应(便捷方法)
*
* @param socket 网络连接
* @param originalMessage 原始请求消息
* @param message 成功消息
* @param codecType 消息编解码类型
*/
protected void sendSuccessResponse(NetSocket socket, IotDeviceMessage originalMessage,
String message, String codecType) {
sendResponse(socket, originalMessage, true, message, codecType);
}
/**
* 发送失败响应(便捷方法)
*
* @param socket 网络连接
* @param originalMessage 原始请求消息
* @param message 失败消息
* @param codecType 消息编解码类型
*/
protected void sendErrorResponse(NetSocket socket, IotDeviceMessage originalMessage,
String message, String codecType) {
sendResponse(socket, originalMessage, false, message, codecType);
}
/**
* 判断是否为认证消息
* <p>
* 默认实现:判断 method 是否为 "auth"
* 子类可以重写以支持协议特定的认证方法名
*
* @param message 设备消息
* @return true-是认证消息false-不是
*/
protected boolean isAuthenticationMessage(IotDeviceMessage message) {
return AUTH_METHOD.equals(message.getMethod());
}
}

View File

@@ -0,0 +1,146 @@
package com.viewshanghai.module.iot.gateway.protocol.tcp.handler;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* 认证结果封装类
* <p>
* 用于协议处理器返回认证处理结果,包含:
* - 认证是否成功
* - 设备信息(认证成功时)
* - 错误消息(认证失败时)
* - 认证状态pending 表示需要多步认证)
* <p>
* 使用场景:
* 1. 标准协议:单次认证,返回 success 或 failure
* 2. JT808两步认证
* - 注册阶段:返回 pending注册成功但未完成认证
* - 鉴权阶段:返回 success 或 failure
* 3. 免认证协议:直接返回 success
*
* @author lzh
*/
@Data
@Accessors(chain = true)
public class AuthResult {
/**
* 认证状态枚举
*/
public enum Status {
/**
* 认证成功:设备已完成认证,可以发送业务消息
*/
SUCCESS,
/**
* 认证失败:设备认证失败,连接应该断开或等待重试
*/
FAILURE,
/**
* 认证待定:设备部分认证成功,需要继续后续步骤
* <p>
* 例如 JT808 的注册阶段:注册成功但还需要鉴权
*/
PENDING
}
/**
* 认证状态
*/
private Status status;
/**
* 响应消息(用于日志和错误提示)
*/
private String message;
/**
* 设备信息(认证成功或 PENDING 时必须提供)
*/
private IotDeviceRespDTO device;
/**
* 协议特定的附加数据
* <p>
* 用于协议处理器在多步认证中传递状态信息。
* 例如 JT808 可以在注册阶段存储鉴权码,鉴权阶段取出验证。
*/
private Object additionalData;
// ========== 便捷构造方法 ==========
/**
* 创建认证成功结果
*
* @param device 设备信息
* @param message 成功消息
* @return 认证成功结果
*/
public static AuthResult success(IotDeviceRespDTO device, String message) {
return new AuthResult()
.setStatus(Status.SUCCESS)
.setDevice(device)
.setMessage(message);
}
/**
* 创建认证失败结果
*
* @param message 失败原因
* @return 认证失败结果
*/
public static AuthResult failure(String message) {
return new AuthResult()
.setStatus(Status.FAILURE)
.setMessage(message);
}
/**
* 创建认证待定结果(用于多步认证)
*
* @param device 设备信息
* @param message 待定消息
* @return 认证待定结果
*/
public static AuthResult pending(IotDeviceRespDTO device, String message) {
return new AuthResult()
.setStatus(Status.PENDING)
.setDevice(device)
.setMessage(message);
}
// ========== 便捷判断方法 ==========
/**
* 是否认证成功
*/
public boolean isSuccess() {
return Status.SUCCESS == status;
}
/**
* 是否认证失败
*/
public boolean isFailure() {
return Status.FAILURE == status;
}
/**
* 是否认证待定(需要继续后续步骤)
*/
public boolean isPending() {
return Status.PENDING == status;
}
/**
* 是否已完成认证(成功或失败,不包括待定)
*/
public boolean isCompleted() {
return status == Status.SUCCESS || status == Status.FAILURE;
}
}

View File

@@ -0,0 +1,536 @@
package com.viewshanghai.module.iot.gateway.protocol.tcp.handler;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.viewshanghai.framework.common.pojo.CommonResult;
import com.viewshanghai.module.iot.core.biz.IotDeviceCommonApi;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage;
import com.viewshanghai.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec;
import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* JT808 协议处理器
* <p>
* 实现 JT808 部标协议的完整认证流程:
* 1. 终端注册0x0100设备首次连接时注册平台返回鉴权码
* 2. 终端鉴权0x0102设备使用鉴权码进行鉴权鉴权成功后可发送业务消息
* <p>
* 认证策略(基于设备配置的 authType
* - SECRET一机一密使用设备的 deviceSecret 作为鉴权码
* - PRODUCT_SECRET一型一密使用产品的 productSecret 作为鉴权码
* - NONE免鉴权使用终端手机号作为鉴权码兼容模式
* <p>
* 设计说明:
* - 基于 demo 项目com/iot/transport/jt808的实现逻辑
* - 注册和鉴权分两步完成,符合 JT808 标准流程
* - 鉴权码在注册阶段生成并缓存,鉴权阶段验证后清除
* - 支持设备级和产品级认证类型配置(设备级优先)
*
* @author lzh
*/
@Slf4j
@Component
public class Jt808ProtocolHandler extends AbstractProtocolHandler {
private static final String CODEC_TYPE_JT808 = IotJt808DeviceMessageCodec.TYPE;
/**
* JT808 消息方法名
*/
private static final String METHOD_REGISTER = "jt808.terminal.register"; // 终端注册 (0x0100)
private static final String METHOD_AUTH = "jt808.terminal.auth"; // 终端鉴权 (0x0102)
/**
* 鉴权码缓存过期时间(分钟)
* <p>
* 设备在注册后需要在此时间内完成鉴权,否则需要重新注册。
* Redis 会自动清理过期的鉴权码,无需手动管理。
*/
private static final int AUTH_TOKEN_EXPIRE_MINUTES = 5;
private final String JT808_AUTH_TOKEN = "iot:jt808_auth_token:%s";
/**
* Redis 模板(用于鉴权码缓存)
*/
@Resource
private StringRedisTemplate stringRedisTemplate;
private final IotDeviceCommonApi deviceApi;
public Jt808ProtocolHandler(IotDeviceMessageService deviceMessageService) {
super(deviceMessageService);
// 使用 SpringUtil 延迟获取,避免循环依赖
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
}
@Override
public String getProtocolType() {
return "JT808";
}
@Override
public boolean canHandle(IotDeviceMessage message, String codecType) {
// 只处理 JT808 编解码类型的消息
return CODEC_TYPE_JT808.equals(codecType);
}
@Override
public AuthResult handleAuthentication(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket) {
String method = message.getMethod();
// 路由到不同的认证处理方法
if (METHOD_REGISTER.equals(method)) {
return handleRegister(clientId, message, codecType, socket);
} else if (METHOD_AUTH.equals(method)) {
return handleAuth(clientId, message, codecType, socket);
} else {
log.warn("[handleAuthentication][不支持的认证方法method: {}]", method);
return AuthResult.failure("不支持的认证方法: " + method);
}
}
@Override
public void handleBusinessMessage(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket,
String productKey, String deviceName, String serverId) {
try {
// 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
log.info("[handleBusinessMessage][JT808 业务消息已发送clientId: {}, method: {}, messageId: {}]",
clientId, message.getMethod(), message.getId());
} catch (Exception e) {
log.error("[handleBusinessMessage][JT808 业务消息处理异常clientId: {}]", clientId, e);
}
}
@Override
public void sendResponse(NetSocket socket, IotDeviceMessage originalMessage,
boolean success, String message, String codecType) {
// JT808 协议的响应由具体的处理方法发送(注册应答、通用应答等)
// 此方法不需要实现,保留空实现
log.debug("[sendResponse][JT808 协议响应由具体方法发送,跳过通用响应]");
}
// ========== 认证处理方法 ==========
/**
* 处理终端注册0x0100
* <p>
* 流程:
* 1. 提取终端手机号deviceName
* 2. 查询设备是否存在
* 3. 生成鉴权码(根据设备的认证类型)
* 4. 缓存鉴权码
* 5. 发送注册应答0x8100
* <p>
* 注意注册成功后设备还需要发送鉴权消息0x0102才能完成认证
*
* @param clientId 客户端ID
* @param message 注册消息
* @param codecType 编解码类型
* @param socket 网络连接
* @return 认证结果PENDING 表示注册成功但需要继续鉴权)
*/
private AuthResult handleRegister(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket) {
try {
// 1. 提取终端手机号
String terminalPhone = extractTerminalPhone(message);
int flowId = extractFlowId(message);
if (StrUtil.isBlank(terminalPhone)) {
log.warn("[handleRegister][无法提取终端手机号clientId: {}]", clientId);
sendRegisterResp(socket, "", 0, (byte) 1, null, codecType, message.getRequestId());
return AuthResult.failure("无法提取终端手机号");
}
// 2. 查询设备是否存在deviceName 就是终端手机号)
IotDeviceRespDTO device = findDeviceByDeviceName(terminalPhone);
if (device == null) {
log.warn("[handleRegister][设备不存在,终端手机号: {}]", terminalPhone);
sendRegisterResp(socket, terminalPhone, flowId, (byte) 1, null, codecType, message.getRequestId());
return AuthResult.failure("设备不存在");
}
// 3. 生成鉴权码(根据设备的认证类型)
String authToken = generateAuthToken(terminalPhone, device);
// 4. 缓存鉴权码到 Redis设置过期时间
String redisKey = String.format(JT808_AUTH_TOKEN, terminalPhone);
stringRedisTemplate.opsForValue().set(redisKey, authToken,
AUTH_TOKEN_EXPIRE_MINUTES, TimeUnit.MINUTES);
log.debug("[handleRegister][鉴权码已缓存到 Redis终端手机号: {}, 过期时间: {} 分钟]",
terminalPhone, AUTH_TOKEN_EXPIRE_MINUTES);
// 5. 发送注册应答成功result_code=0
sendRegisterResp(socket, terminalPhone, flowId, (byte) 0, authToken, codecType, message.getRequestId());
log.info("[handleRegister][JT808 注册成功,终端手机号: {}, 设备ID: {}, 鉴权码已生成]",
terminalPhone, device.getId());
// 返回 PENDING 状态:注册成功但还需要鉴权
return AuthResult.pending(device, "注册成功,等待鉴权");
} catch (Exception e) {
log.error("[handleRegister][JT808 注册处理异常clientId: {}]", clientId, e);
try {
String terminalPhone = extractTerminalPhone(message);
sendRegisterResp(socket, terminalPhone != null ? terminalPhone : "", 0,
(byte) 2, null, codecType, message.getRequestId());
} catch (Exception ex) {
log.error("[handleRegister][发送注册应答失败]", ex);
}
return AuthResult.failure("注册处理异常: " + e.getMessage());
}
}
/**
* 处理终端鉴权0x0102
* <p>
* 流程:
* 1. 提取终端手机号和鉴权码
* 2. 查询设备是否存在
* 3. 验证鉴权码(与注册时缓存的鉴权码比对)
* 4. 发送通用应答0x8001
* 5. 清除鉴权码缓存
* <p>
* 注意:鉴权成功后,设备才能发送业务消息
*
* @param clientId 客户端ID
* @param message 鉴权消息
* @param codecType 编解码类型
* @param socket 网络连接
* @return 认证结果SUCCESS 表示鉴权成功,可以发送业务消息)
*/
private AuthResult handleAuth(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket) {
try {
// 1. 提取终端手机号和鉴权码
String terminalPhone = extractTerminalPhone(message);
String authCode = extractAuthCode(message);
int flowId = extractFlowId(message);
if (StrUtil.isBlank(terminalPhone)) {
log.warn("[handleAuth][无法提取终端手机号clientId: {}]", clientId);
sendCommonResp(socket, "", 0, 0x0102, (byte) 1, codecType, message.getRequestId());
return AuthResult.failure("无法提取终端手机号");
}
if (StrUtil.isBlank(authCode)) {
log.warn("[handleAuth][鉴权码为空,终端手机号: {}]", terminalPhone);
sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId());
return AuthResult.failure("鉴权码为空");
}
// 2. 查询设备是否存在
IotDeviceRespDTO device = findDeviceByDeviceName(terminalPhone);
if (device == null) {
log.warn("[handleAuth][设备不存在,终端手机号: {}]", terminalPhone);
sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId());
return AuthResult.failure("设备不存在");
}
// 3. 从 Redis 获取鉴权码
String redisKey = String.format(JT808_AUTH_TOKEN, terminalPhone);
String cachedAuthToken = stringRedisTemplate.opsForValue().get(redisKey);
if (StrUtil.isBlank(cachedAuthToken)) {
log.warn("[handleAuth][未找到鉴权码缓存,终端手机号: {},可能未注册或缓存已过期]", terminalPhone);
sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId());
return AuthResult.failure("未找到鉴权码,请先注册");
}
// 验证鉴权码是否匹配Redis 自动处理过期,无需手动检查)
if (!authCode.equals(cachedAuthToken)) {
log.warn("[handleAuth][鉴权码验证失败,终端手机号: {}, 期望: {}, 实际: {}]",
terminalPhone, cachedAuthToken, authCode);
sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId());
return AuthResult.failure("鉴权码错误");
}
// 4. 发送通用应答成功result_code=0
sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 0, codecType, message.getRequestId());
// 5. 从 Redis 删除鉴权码
stringRedisTemplate.delete(redisKey);
log.info("[handleAuth][JT808 鉴权成功,终端手机号: {}, 设备ID: {}]",
terminalPhone, device.getId());
// 返回 SUCCESS 状态:鉴权成功,设备可以发送业务消息
return AuthResult.success(device, "鉴权成功");
} catch (Exception e) {
log.error("[handleAuth][JT808 鉴权处理异常clientId: {}]", clientId, e);
try {
String terminalPhone = extractTerminalPhone(message);
int flowId = extractFlowId(message);
sendCommonResp(socket, terminalPhone != null ? terminalPhone : "", flowId,
0x0102, (byte) 2, codecType, message.getRequestId());
} catch (Exception ex) {
log.error("[handleAuth][发送通用应答失败]", ex);
}
return AuthResult.failure("鉴权处理异常: " + e.getMessage());
}
}
// ========== 鉴权码生成策略 ==========
/**
* 生成鉴权码
* <p>
* 使用系统标准认证 API 生成鉴权码,保持与标准协议认证的一致性。
* 鉴权码的生成策略由系统统一管理,支持:
* - SECRET一机一密使用设备的 deviceSecret
* - PRODUCT_SECRET一型一密使用产品的 productSecret
* - NONE免鉴权使用终端手机号
* <p>
* 注意:此方法基于 demo 的实现RegisterHandler:48demo 使用手机号作为鉴权码
*
* @param terminalPhone 终端手机号
* @param device 设备信息
* @return 鉴权码
*/
private String generateAuthToken(String terminalPhone, IotDeviceRespDTO device) {
try {
// 构建认证参数username 格式productKey.deviceName
// String username = IotDeviceAuthUtils.buildUsername(device.getProductKey(), device.getDeviceName());
// 调用系统标准认证 API 获取认证信息
// 注意:这里只是为了获取密钥信息,不是真正的认证
// 实际的鉴权码应该是设备的密钥deviceSecret 或 productSecret
// 但由于当前 API 不返回密钥,我们使用终端手机号作为鉴权码(与 demo 保持一致)
log.debug("[generateAuthToken][生成鉴权码,终端手机号: {}, 设备: {}, 认证类型: {}]",
terminalPhone, device.getDeviceName(),
StrUtil.isNotBlank(device.getAuthType()) ? device.getAuthType() : device.getProductAuthType());
// 使用终端手机号作为鉴权码(与 demo 保持一致)
// TODO: 后续可以根据 authType 从系统获取真实的密钥
return terminalPhone;
} catch (Exception e) {
log.error("[generateAuthToken][生成鉴权码异常,终端手机号: {}]", terminalPhone, e);
// 异常时使用终端手机号兜底
return terminalPhone;
}
}
// ========== JT808 协议响应方法 ==========
/**
* 发送终端注册应答0x8100
* <p>
* 消息格式:
* - replyFlowId: 应答流水号(对应注册消息的流水号)
* - replyCode: 结果码0-成功1-车辆已注册2-数据库无该车辆3-终端已注册4-数据库无该终端)
* - authToken: 鉴权码(成功时返回,用于后续鉴权)
*
* @param socket 网络连接
* @param phone 终端手机号
* @param replyFlowId 应答流水号
* @param replyCode 结果码
* @param authToken 鉴权码
* @param codecType 编解码类型
* @param requestId 请求ID
*/
private void sendRegisterResp(NetSocket socket, String phone, int replyFlowId, byte replyCode,
String authToken, String codecType, String requestId) {
try {
// 生成平台流水号
int flowId = (int) (System.currentTimeMillis() % 65535) + 1;
// 构建注册应答消息参数
Map<String, Object> params = MapUtil.<String, Object>builder()
.put("phone", phone)
.put("replyFlowId", replyFlowId)
.put("replyCode", (int) replyCode)
.put("authToken", authToken)
.put("flowId", flowId)
.build();
// 构建响应消息
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(
requestId,
"jt808.platform.registerResp",
params,
replyCode == 0 ? 0 : 401,
replyCode == 0 ? "注册成功" : "注册失败"
);
// 编码并发送
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
log.debug("[sendRegisterResp][发送注册应答,终端手机号: {}, 结果码: {}]", phone, replyCode);
} catch (Exception e) {
log.error("[sendRegisterResp][发送注册应答失败,终端手机号: ]", phone, e);
}
}
/**
* 发送平台通用应答0x8001
* <p>
* 消息格式:
* - replyFlowId: 应答流水号(对应原消息的流水号)
* - replyId: 应答ID对应原消息的消息ID如 0x0102
* - replyCode: 结果码0-成功1-失败2-消息有误3-不支持)
*
* @param socket 网络连接
* @param phone 终端手机号
* @param replyFlowId 应答流水号
* @param replyId 应答ID
* @param replyCode 结果码
* @param codecType 编解码类型
* @param requestId 请求ID
*/
private void sendCommonResp(NetSocket socket, String phone, int replyFlowId, int replyId,
byte replyCode, String codecType, String requestId) {
try {
// 生成平台流水号
int flowId = (int) (System.currentTimeMillis() % 65535) + 1;
// 构建通用应答消息参数
Map<String, Object> params = MapUtil.<String, Object>builder()
.put("phone", phone)
.put("replyFlowId", replyFlowId)
.put("replyId", replyId)
.put("replyCode", (int) replyCode)
.put("flowId", flowId)
.build();
// 构建响应消息
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(
requestId,
"jt808.platform.commonResp",
params,
replyCode == 0 ? 0 : 401,
replyCode == 0 ? "成功" : "失败"
);
// 编码并发送
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
log.debug("[sendCommonResp][发送通用应答,终端手机号: {}, 应答ID: 0x{}, 结果码: {}]",
phone, Integer.toHexString(replyId), replyCode);
} catch (Exception e) {
log.error("[sendCommonResp][发送通用应答失败,终端手机号: {}]", phone, e);
}
}
// ========== 消息参数提取方法 ==========
/**
* 从消息中提取终端手机号
* <p>
* 终端手机号存储在消息的 _metadata.terminalPhone 字段中,
* 由 IotJt808DeviceMessageCodec 解码时自动填充
*
* @param message 设备消息
* @return 终端手机号,提取失败返回 null
*/
@SuppressWarnings("unchecked")
private String extractTerminalPhone(IotDeviceMessage message) {
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
Object terminalPhone = metadata.get("terminalPhone");
if (terminalPhone != null) {
return terminalPhone.toString();
}
}
}
return null;
}
/**
* 从消息中提取流水号
* <p>
* 流水号存储在消息的 _metadata.flowId 字段中
*
* @param message 设备消息
* @return 流水号,提取失败返回 1
*/
@SuppressWarnings("unchecked")
private int extractFlowId(IotDeviceMessage message) {
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
Object flowId = metadata.get("flowId");
if (flowId instanceof Number) {
return ((Number) flowId).intValue();
}
}
}
return 1;
}
/**
* 从消息中提取鉴权码
* <p>
* 鉴权码存储在消息的 params.authCode 字段中,
* 由 IotJt808DeviceMessageCodec 解码时填充
*
* @param message 设备消息
* @return 鉴权码,提取失败返回 null
*/
@SuppressWarnings("unchecked")
private String extractAuthCode(IotDeviceMessage message) {
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
Object authCode = params.get("authCode");
if (authCode != null) {
return authCode.toString();
}
}
return null;
}
/**
* 通过 deviceName 查找设备
* <p>
* 对于 JT808 设备deviceName 就是终端手机号(纯数字)
*
* @param deviceName 设备名称(终端手机号)
* @return 设备信息,查询失败返回 null
*/
private IotDeviceRespDTO findDeviceByDeviceName(String deviceName) {
try {
// 调用设备 API 查询(只传 deviceName
CommonResult<IotDeviceRespDTO> result =
deviceApi.getDevice(new IotDeviceGetReqDTO().setDeviceName(deviceName));
if (result.isSuccess() && result.getData() != null) {
return result.getData();
}
log.warn("[findDeviceByDeviceName][设备不存在deviceName: {}]", deviceName);
return null;
} catch (Exception e) {
log.error("[findDeviceByDeviceName][查询设备失败deviceName: {}]", deviceName, e);
return null;
}
}
}

View File

@@ -0,0 +1,105 @@
package com.viewshanghai.module.iot.gateway.protocol.tcp.handler;
import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage;
import io.vertx.core.net.NetSocket;
/**
* TCP 协议处理器接口
* <p>
* 用于实现不同协议的插件化处理,每个协议实现此接口以提供:
* 1. 协议识别能力canHandle
* 2. 认证处理逻辑handleAuthentication
* 3. 业务消息处理handleBusinessMessage
* 4. 响应发送sendResponse
* <p>
* 设计原则:
* - 每个协议处理器独立负责自己的认证和消息处理逻辑
* - 主处理器IotTcpUpstreamHandler只负责路由不包含协议特定逻辑
* - 协议处理器通过 Spring 自动注入,实现热插拔
*
* @author lzh
*/
public interface ProtocolHandler {
/**
* 获取协议类型标识
* <p>
* 用于日志记录和调试建议使用大写英文STANDARD, JT808, MODBUS
*
* @return 协议类型标识
*/
String getProtocolType();
/**
* 判断是否能处理该消息
* <p>
* 根据消息内容和编解码类型判断是否由当前协议处理器处理。
* 注意:多个处理器可能都返回 true主处理器会选择第一个匹配的。
*
* @param message 已解码的设备消息
* @param codecType 消息编解码类型JSON, BINARY, JT808
* @return true-可以处理false-不能处理
*/
boolean canHandle(IotDeviceMessage message, String codecType);
/**
* 处理认证消息
* <p>
* 不同协议的认证流程可能不同:
* - 标准协议单次认证auth 方法)
* - JT808两步认证注册 + 鉴权)
* - Modbus可能无需认证
* <p>
* 认证成功后,处理器应返回 AuthResult.success(),主处理器会:
* 1. 注册连接到 ConnectionManager
* 2. 发送设备上线消息
* <p>
* 认证失败或需要多步认证时,返回 AuthResult.failure() 或 AuthResult.pending()
*
* @param clientId 客户端临时ID连接建立时生成
* @param message 认证消息
* @param codecType 消息编解码类型
* @param socket 网络连接(用于发送响应)
* @return 认证结果
*/
AuthResult handleAuthentication(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket);
/**
* 处理业务消息
* <p>
* 处理已认证设备发送的业务消息(属性上报、事件上报等)。
* 业务消息通常需要转发到消息总线,由业务层处理。
* <p>
* 注意:此方法调用前,主处理器已验证设备已认证。
*
* @param clientId 客户端临时ID
* @param message 业务消息
* @param codecType 消息编解码类型
* @param socket 网络连接
* @param productKey 产品Key从连接信息中获取
* @param deviceName 设备名称(从连接信息中获取)
* @param serverId 服务器ID用于消息路由
*/
void handleBusinessMessage(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket,
String productKey, String deviceName, String serverId);
/**
* 发送响应消息
* <p>
* 用于向设备发送响应消息(成功/失败/错误等)。
* 不同协议的响应格式可能不同,由协议处理器自行实现。
* <p>
* 注意:某些协议可能不需要响应(如单向上报),可以空实现。
*
* @param socket 网络连接
* @param originalMessage 原始请求消息(用于提取 requestId 等信息)
* @param success 是否成功
* @param message 响应消息内容
* @param codecType 消息编解码类型
*/
void sendResponse(NetSocket socket, IotDeviceMessage originalMessage,
boolean success, String message, String codecType);
}

View File

@@ -0,0 +1,192 @@
package com.viewshanghai.module.iot.gateway.protocol.tcp.handler;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.viewshanghai.framework.common.pojo.CommonResult;
import com.viewshanghai.module.iot.core.biz.IotDeviceCommonApi;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage;
import com.viewshanghai.module.iot.core.util.IotDeviceAuthUtils;
import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService;
import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.net.NetSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 标准协议处理器
* <p>
* 处理使用标准认证流程的协议JSON 和 Binary 格式):
* 1. 单次认证:设备发送 auth 消息,包含 username 和 password
* 2. 认证成功后,设备可以发送业务消息
* 3. 业务消息转发到消息总线
* <p>
* 支持的编解码类型:
* - JSONIotTcpJsonDeviceMessageCodec
* - BINARYIotTcpBinaryDeviceMessageCodec
*
* @author lzh
*/
@Slf4j
@Component
public class StandardProtocolHandler extends AbstractProtocolHandler {
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private final IotDeviceCommonApi deviceApi;
private final IotDeviceService deviceService;
public StandardProtocolHandler(IotDeviceMessageService deviceMessageService) {
super(deviceMessageService);
// 使用 SpringUtil 延迟获取,避免循环依赖
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
}
@Override
public String getProtocolType() {
return "STANDARD";
}
@Override
public boolean canHandle(IotDeviceMessage message, String codecType) {
// 处理 JSON 和 BINARY 格式的标准协议
return CODEC_TYPE_JSON.equals(codecType) || CODEC_TYPE_BINARY.equals(codecType);
}
@Override
public AuthResult handleAuthentication(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket) {
try {
// 1. 解析认证参数
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthentication][认证参数解析失败clientId: {}]", clientId);
sendErrorResponse(socket, message, "认证参数不完整", codecType);
return AuthResult.failure("认证参数不完整");
}
// 2. 执行认证
if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthentication][认证失败clientId: , username: {}]",
clientId, authParams.getUsername());
sendErrorResponse(socket, message, "认证失败", codecType);
return AuthResult.failure("认证失败");
}
// 3. 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
log.warn("[handleAuthentication][解析设备信息失败username: {}]", authParams.getUsername());
sendErrorResponse(socket, message, "解析设备信息失败", codecType);
return AuthResult.failure("解析设备信息失败");
}
// 4. 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(
deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (device == null) {
log.warn("[handleAuthentication][设备不存在productKey: {}, deviceName: {}]",
deviceInfo.getProductKey(), deviceInfo.getDeviceName());
sendErrorResponse(socket, message, "设备不存在", codecType);
return AuthResult.failure("设备不存在");
}
// 5. 发送成功响应
sendSuccessResponse(socket, message, "认证成功", codecType);
log.info("[handleAuthentication][认证成功设备ID: {}, 设备名: {}]",
device.getId(), device.getDeviceName());
return AuthResult.success(device, "认证成功");
} catch (Exception e) {
log.error("[handleAuthentication][认证处理异常clientId: {}]", clientId, e);
sendErrorResponse(socket, message, "认证处理异常", codecType);
return AuthResult.failure("认证处理异常: " + e.getMessage());
}
}
@Override
public void handleBusinessMessage(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket,
String productKey, String deviceName, String serverId) {
try {
// 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
log.info("[handleBusinessMessage][发送消息到消息总线clientId: {}, method: {}, messageId: {}]",
clientId, message.getMethod(), message.getId());
} catch (Exception e) {
log.error("[handleBusinessMessage][业务消息处理异常clientId: {}]", clientId, e);
}
}
// ========== 私有方法 ==========
/**
* 解析认证参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 认证参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceAuthReqDTO parseAuthParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceAuthReqDTO()
.setClientId(MapUtil.getStr(paramMap, "clientId"))
.setUsername(MapUtil.getStr(paramMap, "username"))
.setPassword(MapUtil.getStr(paramMap, "password"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceAuthReqDTO) {
return (IotDeviceAuthReqDTO) params;
}
return null;
} catch (Exception e) {
log.error("[parseAuthParams][解析认证参数失败params: {}]", params, e);
return null;
}
}
/**
* 验证设备认证信息
*
* @param authParams 认证参数
* @return 是否认证成功
*/
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
try {
// 调用系统标准认证 API
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(authParams.getClientId())
.setUsername(authParams.getUsername())
.setPassword(authParams.getPassword()));
result.checkError();
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
log.error("[validateDeviceAuth][设备认证异常username: {}]", authParams.getUsername(), e);
return false;
}
}
}

View File

@@ -43,7 +43,10 @@ public class IotTcpDownstreamHandler {
return;
}
// 2. 根据产品 Key 和设备名称编码消息并发送到设备
// 2. 补充 deviceName 到 params 中(用于 JT808 等需要设备标识的编码器)
injectDeviceNameToParams(message, deviceInfo.getDeviceName());
// 3. 根据产品 Key 和设备名称编码消息并发送到设备
byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
@@ -60,4 +63,21 @@ public class IotTcpDownstreamHandler {
}
}
/**
* 将 deviceName 注入到消息 params 中
*
* 对于 JT808 等需要设备标识(如终端手机号)的协议,
* 编码器可以从 params._deviceName 中获取
*/
@SuppressWarnings("unchecked")
private void injectDeviceNameToParams(IotDeviceMessage message, String deviceName) {
if (message.getParams() == null) {
message.setParams(new java.util.HashMap<>());
}
if (message.getParams() instanceof java.util.Map) {
((java.util.Map<String, Object>) message.getParams()).put("_deviceName", deviceName);
}
}
}

View File

@@ -1,30 +1,40 @@
package com.viewshanghai.module.iot.gateway.protocol.tcp.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.viewshanghai.framework.common.pojo.CommonResult;
import com.viewshanghai.framework.common.util.json.JsonUtils;
import com.viewshanghai.module.iot.core.biz.IotDeviceCommonApi;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage;
import com.viewshanghai.module.iot.core.util.IotDeviceAuthUtils;
import com.viewshanghai.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec;
import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
import com.viewshanghai.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.AuthResult;
import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.ProtocolHandler;
import com.viewshanghai.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService;
import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* TCP 上行消息处理器
* TCP 上行消息处理器(重构版)
* <p>
* 职责:
* 1. 管理 TCP 连接生命周期(连接建立、异常、关闭)
* 2. 检测消息格式类型JSON/Binary/JT808
* 3. 解码设备消息
* 4. 路由到对应的协议处理器
* 5. 管理设备认证状态
* 6. 发送设备上线/离线消息
* <p>
* 设计原则:
* - 主处理器只负责路由,不包含协议特定逻辑
* - 协议处理器通过 Spring 自动注入,实现插件化
* - 认证成功后,统一注册连接和发送上线消息
* - 支持多步认证(如 JT808 的注册+鉴权)
*
* @author 芋道源码
*/
@@ -33,27 +43,20 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
private static final String AUTH_METHOD = "auth";
private static final String CODEC_TYPE_JT808 = IotJt808DeviceMessageCodec.TYPE;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotTcpConnectionManager connectionManager;
private final IotDeviceCommonApi deviceApi;
private final List<ProtocolHandler> protocolHandlers;
private final String serverId;
public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotTcpConnectionManager connectionManager) {
IotTcpConnectionManager connectionManager,
List<ProtocolHandler> protocolHandlers) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.connectionManager = connectionManager;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.protocolHandlers = protocolHandlers;
this.serverId = protocol.getServerId();
}
@@ -64,9 +67,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 设置异常和关闭处理器
socket.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress(), ex);
cleanupConnection(socket);
});
socket.closeHandler(v -> {
log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
cleanupConnection(socket);
@@ -77,8 +81,8 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
try {
processMessage(clientId, buffer, socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
log.error("[handle][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage(), e);
cleanupConnection(socket);
socket.close();
}
@@ -87,6 +91,13 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 处理消息
* <p>
* 流程:
* 1. 检测消息格式类型JSON/Binary/JT808
* 2. 解码消息
* 3. 查找协议处理器
* 4. 判断是否为认证消息
* 5. 路由到协议处理器处理
*
* @param clientId 客户端 ID
* @param buffer 消息
@@ -114,113 +125,125 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
throw new Exception("消息解码失败: " + e.getMessage(), e);
}
// 4. 根据消息类型路由处理
// 4. 查找协议处理
ProtocolHandler handler = findProtocolHandler(message, codecType);
if (handler == null) {
log.warn("[processMessage][未找到协议处理器codecType: {}, method: {}]",
codecType, message.getMethod());
return;
}
// 5. 判断是否为认证消息
if (isAuthenticationMessage(message)) {
handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler);
} else {
handleBusinessWithProtocol(clientId, message, codecType, socket, handler);
}
}
/**
* 使用协议处理器处理认证消息
* <p>
* 认证结果处理:
* - SUCCESS注册连接发送上线消息
* - PENDING等待后续认证步骤如 JT808 注册后等待鉴权)
* - FAILURE认证失败不做处理协议处理器已发送失败响应
*
* @param clientId 客户端 ID
* @param message 认证消息
* @param codecType 消息编解码类型
* @param socket 网络连接
* @param handler 协议处理器
*/
private void handleAuthenticationWithProtocol(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket,
ProtocolHandler handler) {
try {
if (AUTH_METHOD.equals(message.getMethod())) {
// 认证请求
handleAuthenticationRequest(clientId, message, codecType, socket);
// 委托给协议处理器
AuthResult result = handler.handleAuthentication(clientId, message, codecType, socket);
// 根据认证结果处理
if (result.isSuccess()) {
// 认证成功:注册连接并发送上线消息
registerConnection(socket, result.getDevice(), clientId, codecType);
sendOnlineMessage(result.getDevice());
log.info("[handleAuthentication][认证成功,设备: {}, 协议: {}]",
result.getDevice().getDeviceName(), handler.getProtocolType());
} else if (result.isPending()) {
// 认证待定:等待后续认证步骤(如 JT808 注册后等待鉴权)
log.info("[handleAuthentication][认证待定,设备: {}, 协议: {}, 消息: {}]",
result.getDevice() != null ? result.getDevice().getDeviceName() : "unknown",
handler.getProtocolType(), result.getMessage());
} else {
// 业务消息
handleBusinessRequest(clientId, message, codecType, socket);
// 认证失败:协议处理器已发送失败响应,这里只记录日志
log.warn("[handleAuthentication][认证失败clientId: {}, 协议: {}, 原因: {}]",
clientId, handler.getProtocolType(), result.getMessage());
}
} catch (Exception e) {
log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]",
clientId, message.getMethod(), e);
// 发送错误响应,避免客户端一直等待
try {
sendErrorResponse(socket, message.getRequestId(), "消息处理失败", codecType);
} catch (Exception responseEx) {
log.error("[processMessage][发送错误响应失败,客户端 ID: {}]", clientId, responseEx);
}
log.error("[handleAuthentication][认证异常clientId: {}, 协议: {}]",
clientId, handler.getProtocolType(), e);
handler.sendResponse(socket, message, false, "认证异常", codecType);
}
}
/**
* 处理认证请求
* 使用协议处理器处理业务消息
* <p>
* 前置条件:设备已认证
*
* @param clientId 客户端 ID
* @param message 消息信
* @param message 业务消息
* @param codecType 消息编解码类型
* @param socket 网络连接
* @param handler 协议处理器
*/
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, String codecType,
NetSocket socket) {
try {
// 1.1 解析认证参数
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "认证参数不完整", codecType);
return;
}
// 1.2 执行认证
if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {}username: {}]",
clientId, authParams.getUsername());
sendErrorResponse(socket, message.getRequestId(), "认证失败", codecType);
return;
}
// 2.1 解析设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败", codecType);
return;
}
// 2.2 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
sendErrorResponse(socket, message.getRequestId(), "设备不存在", codecType);
return;
}
// 3.1 注册连接
registerConnection(socket, device, clientId, codecType);
// 3.2 发送上线消息
sendOnlineMessage(device);
// 3.3 发送成功响应
sendSuccessResponse(socket, message.getRequestId(), "认证成功", codecType);
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "认证处理异常", codecType);
}
}
/**
* 处理业务请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param codecType 消息编解码类型
* @param socket 网络连接
*/
private void handleBusinessRequest(String clientId, IotDeviceMessage message, String codecType, NetSocket socket) {
private void handleBusinessWithProtocol(String clientId, IotDeviceMessage message,
String codecType, NetSocket socket,
ProtocolHandler handler) {
try {
// 1. 检查认证状态
if (connectionManager.isNotAuthenticated(socket)) {
log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "请先进行认证", codecType);
log.warn("[handleBusinessMessage][设备未认证,clientId: {}]", clientId);
handler.sendResponse(socket, message, false, "请先进行认证", codecType);
return;
}
// 2. 获取认证信息并处理业务消
// 2. 获取连接信
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo == null) {
log.error("[handleBusinessMessage][连接信息不存在clientId: {}]", clientId);
return;
}
// 3. 委托给协议处理器处理业务消息
handler.handleBusinessMessage(
clientId,
message,
codecType,
socket,
connectionInfo.getProductKey(),
connectionInfo.getDeviceName(),
serverId
);
// 3. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[handleBusinessRequest][发送消息到消息总线,客户端 ID: {},消息: {}",
clientId, message.toString());
} catch (Exception e) {
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e);
}
}
/**
* 获取消息编解码类型
* <p>
* 检测优先级:
* 1. 如果已认证,使用缓存的编解码类型
* 2. 未认证时,通过消息格式自动检测:
* - JT808首尾标识符 0x7e
* - Binary魔术字 0x7E
* - JSON默认
*
* @param buffer 消息
* @param socket 网络连接
@@ -235,15 +258,69 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
}
// 2. 未认证时检测消息格式类型
return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY
: CODEC_TYPE_JSON;
byte[] data = buffer.getBytes();
// 2.1 检测是否为 JT808 格式(首尾标识符 0x7e
if (IotJt808DeviceMessageCodec.isJt808Format(data)) {
return CODEC_TYPE_JT808;
}
// 2.2 检测是否为自定义二进制格式(魔术字 0x7E
if (IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(data)) {
return CODEC_TYPE_BINARY;
}
// 2.3 默认为 JSON 格式
return CODEC_TYPE_JSON;
}
/**
* 查找协议处理器
* <p>
* 遍历所有协议处理器,返回第一个能处理该消息的处理器
*
* @param message 设备消息
* @param codecType 消息编解码类型
* @return 协议处理器,未找到返回 null
*/
private ProtocolHandler findProtocolHandler(IotDeviceMessage message, String codecType) {
return protocolHandlers.stream()
.filter(handler -> handler.canHandle(message, codecType))
.findFirst()
.orElse(null);
}
/**
* 判断是否为认证消息
* <p>
* 认证消息包括:
* - 标准认证auth
* - JT808 注册jt808.terminal.register
* - JT808 鉴权jt808.terminal.auth
*
* @param message 设备消息
* @return true-是认证消息false-不是
*/
private boolean isAuthenticationMessage(IotDeviceMessage message) {
String method = message.getMethod();
return "auth".equals(method)
|| "jt808.terminal.register".equals(method)
|| "jt808.terminal.auth".equals(method);
}
/**
* 注册连接信息
* <p>
* 将设备连接信息注册到连接管理器,包括:
* - 设备 ID
* - 产品 Key
* - 设备名称
* - 客户端 ID
* - 编解码类型
* - 认证状态
*
* @param socket 网络连接
* @param device 设备
* @param device 设备信息
* @param clientId 客户端 ID
* @param codecType 消息编解码类型
*/
@@ -256,12 +333,15 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
.setClientId(clientId)
.setCodecType(codecType)
.setAuthenticated(true);
// 注册连接
// 注册连接(如果设备已有其他连接,会自动断开旧连接)
connectionManager.registerConnection(socket, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
* <p>
* 设备认证成功后,发送上线消息到消息总线,通知业务层设备已上线
*
* @param device 设备信息
*/
@@ -270,6 +350,9 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.debug("[sendOnlineMessage][发送上线消息成功,设备: {}]", device.getDeviceName());
} catch (Exception e) {
log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
}
@@ -277,6 +360,8 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
/**
* 清理连接
* <p>
* 连接关闭或异常时,清理连接信息并发送离线消息
*
* @param socket 网络连接
*/
@@ -284,125 +369,20 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
try {
// 1. 发送离线消息(如果已认证)
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo != null) {
if (connectionInfo != null && connectionInfo.isAuthenticated()) {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.debug("[cleanupConnection][发送离线消息成功,设备: {}]", connectionInfo.getDeviceName());
}
// 2. 注销连接
connectionManager.unregisterConnection(socket);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败]", e);
}
}
/**
* 发送响应消息
*
* @param socket 网络连接
* @param success 是否成功
* @param message 消息
* @param requestId 请求 ID
* @param codecType 消息编解码类型
*/
private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) {
try {
Object responseData = MapUtil.builder()
.put("success", success)
.put("message", message)
.build();
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
} catch (Exception e) {
log.error("[sendResponse][发送响应失败requestId: {}]", requestId, e);
}
}
/**
* 验证设备认证信息
*
* @param authParams 认证参数
* @return 是否认证成功
*/
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
try {
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
.setPassword(authParams.getPassword()));
result.checkError();
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
log.error("[validateDeviceAuth][设备认证异常username: {}]", authParams.getUsername(), e);
return false;
}
}
/**
* 发送错误响应
*
* @param socket 网络连接
* @param requestId 请求 ID
* @param errorMessage 错误消息
* @param codecType 消息编解码类型
*/
private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage, String codecType) {
sendResponse(socket, false, errorMessage, requestId, codecType);
}
/**
* 发送成功响应
*
* @param socket 网络连接
* @param requestId 请求 ID
* @param message 消息
* @param codecType 消息编解码类型
*/
@SuppressWarnings("SameParameterValue")
private void sendSuccessResponse(NetSocket socket, String requestId, String message, String codecType) {
sendResponse(socket, true, message, requestId, codecType);
}
/**
* 解析认证参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 认证参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceAuthReqDTO parseAuthParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof java.util.Map) {
java.util.Map<String, Object> paramMap = (java.util.Map<String, Object>) params;
return new IotDeviceAuthReqDTO()
.setClientId(MapUtil.getStr(paramMap, "clientId"))
.setUsername(MapUtil.getStr(paramMap, "username"))
.setPassword(MapUtil.getStr(paramMap, "password"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceAuthReqDTO) {
return (IotDeviceAuthReqDTO) params;
}
// 其他情况尝试 JSON 转换
String jsonStr = JsonUtils.toJsonString(params);
return JsonUtils.parseObject(jsonStr, IotDeviceAuthReqDTO.class);
} catch (Exception e) {
log.error("[parseAuthParams][解析认证参数({})失败]", params, e);
return null;
}
}
}
}