Compare commits

4 Commits

Author SHA1 Message Date
lzh
c88dc3fc10 fix(ops): 修正工牌设备状态事件处理器配置
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
- 包名从 handler 改为 listener
- 使用 TransactionalEventListener 替代 EventListener
- 线程池名修正为 ops-task-executor
- 状态从 ASSIGNED 改为 PENDING
- 清理测试文件中的重复代码

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 18:05:39 +08:00
lzh
5dab9622d6 fix(iot): TCP消息处理容错改进,避免异常断开连接
- 消息格式无法识别时记录警告并跳过,不断开连接
- 消息解码失败时记录警告并跳过,不断开连接
- 设备不存在时断开连接,其他认证失败只记录日志
- 添加 bytesToHex 工具方法用于调试

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 18:05:23 +08:00
lzh
7c7b122de3 fix(iot): 修复JT808业务消息重复应答问题
- 在 handleBusinessMessage 中设置 message.setCode(0) 标记协议层已应答
- 避免业务层重复发送 reply 消息

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 18:04:57 +08:00
lzh
eb5513f9b6 fix(iot): 修复JT808事件消息identifier字段不匹配问题
- 修复 parseButtonEventAsEvent 使用 eventId 而非 identifier
- 添加消息体长度校验,防止越界访问
- 解码失败时返回 null 而非抛异常,避免断开连接

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 18:04:21 +08:00
6 changed files with 100 additions and 43 deletions

View File

@@ -90,11 +90,20 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
// 2. 解析为 JT808 数据包
Jt808DataPack dataPack = decoder.bytes2PackageData(unescapedBytes);
// 3. 转换为统一的 IotDeviceMessage
// 3. 检查消息体是否有效null 表示长度不匹配错误,空数组表示合法的空消息体如心跳)
if (dataPack.getBodyBytes() == null) {
// 长度不匹配,返回 null 由上层跳过该消息
log.warn("[decode][消息体长度不匹配跳过该消息消息ID=0x{}]",
Integer.toHexString(dataPack.getPackHead().getId()));
return null;
}
// 4. 转换为统一的 IotDeviceMessage包括空消息体的消息如心跳
return convertToIotDeviceMessage(dataPack);
} catch (Exception e) {
log.error("[decode][JT808 消息解码失败,数据长度: {}]", bytes.length, e);
throw new RuntimeException("JT808 消息解码失败: " + e.getMessage(), e);
// 解码失败返回 null不断开连接
return null;
}
}
@@ -208,7 +217,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
*
* 物模型标准格式:
* {
* "eventId": "button_event",
* "identifier": "button_event",
* "eventTime": 1234567890,
* "params": {
* "keyId": 1,
@@ -225,7 +234,8 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
Map<String, Object> result = new HashMap<>();
// 统一使用一个事件标识符,通过 isLongPress 参数区分短按和长按
result.put("eventId", "button_event");
// 注意:必须使用 "identifier" 字段,以便 IotDeviceMessageUtils.getIdentifier() 正确提取
result.put("identifier", "button_event");
// 事件时间戳
result.put("eventTime", System.currentTimeMillis());

View File

@@ -43,6 +43,29 @@ public class Jt808Decoder {
msgBodyByteStartIndex = 16;
}
// 验证消息体长度,防止越界
int expectedBodyLength = msgHeader.getBodyLength();
int actualAvailableLength = data.length - msgBodyByteStartIndex - 1; // -1 for checksum
// 检查数据长度是否足够(至少要包含消息头和校验码)
if (data.length < msgBodyByteStartIndex + 1) {
log.warn("[bytes2PackageData][数据长度不足: 头部需要={}, 实际={}, 消息ID=0x{}]",
msgBodyByteStartIndex + 1, data.length, Integer.toHexString(msgHeader.getId()));
ret.setBodyBytes(null); // 使用 null 标记错误状态
ret.setCheckSum(0);
return ret;
}
if (expectedBodyLength > actualAvailableLength) {
log.warn("[bytes2PackageData][消息体长度不匹配: 头部声明={}, 实际可用={}, 数据总长度={}, 消息ID=0x{}]",
expectedBodyLength, actualAvailableLength, data.length,
Integer.toHexString(msgHeader.getId()));
// 使用 null 标记长度不匹配(区别于合法的空消息体如心跳)
ret.setBodyBytes(null);
ret.setCheckSum(data[data.length - 1]);
return ret;
}
byte[] bodyBytes = new byte[msgHeader.getBodyLength()];
System.arraycopy(data, msgBodyByteStartIndex, bodyBytes, 0, bodyBytes.length);
ret.setBodyBytes(bodyBytes);

View File

@@ -113,7 +113,11 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
String codecType, NetSocket socket,
String productKey, String deviceName, String serverId) {
try {
// 1. 发送消息到消息总线
// 1. 标记消息为"协议层已应答",业务层无需再发送 reply
// 设置 code = 0 后isReplyMessage() 返回 true业务层跳过 reply
message.setCode(0);
// 2. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
log.info("[handleBusinessMessage][JT808 业务消息已发送clientId: {}, method: {}, messageId: {}]",

View File

@@ -112,17 +112,30 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 2. 获取消息格式类型
String codecType = getMessageCodecType(buffer, socket);
if (codecType == null) {
// 无法识别消息格式,记录警告但不断开连接
byte[] data = buffer.getBytes();
String preview = bytesToHex(data, Math.min(32, data.length));
log.warn("[processMessage][无法识别消息格式跳过该消息clientId: {}, 数据前32字节: {}]",
clientId, preview);
return;
}
// 3. 解码消息
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
if (message == null) {
throw new Exception("解码后消息为空");
// 解码失败(如消息格式错误),记录警告但不断开连接
log.warn("[processMessage][消息解码失败跳过该消息clientId: {}, codecType: {}]",
clientId, codecType);
return;
}
} catch (Exception e) {
// 消息格式错误时抛出异常,由上层处理连接断开
throw new Exception("消息解码失败: " + e.getMessage(), e);
// 其他异常也记录警告但不断开连接
log.warn("[processMessage][消息解码异常跳过该消息clientId: {}, codecType: {}, 错误: {}]",
clientId, codecType, e.getMessage());
return;
}
// 4. 查找协议处理器
@@ -147,7 +160,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
* 认证结果处理:
* - SUCCESS注册连接发送上线消息
* - PENDING等待后续认证步骤如 JT808 注册后等待鉴权)
* - FAILURE认证失败不做处理(协议处理器已发送失败响应)
* - FAILURE认证失败如果设备不存在则断开连接,否则只记录日志
*
* @param clientId 客户端 ID
* @param message 认证消息
@@ -178,9 +191,16 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
handler.getProtocolType(), result.getMessage());
} else {
// 认证失败:协议处理器已发送失败响应,这里只记录日志
// 认证失败
String failureReason = result.getMessage();
log.warn("[handleAuthentication][认证失败clientId: {}, 协议: {}, 原因: {}]",
clientId, handler.getProtocolType(), result.getMessage());
clientId, handler.getProtocolType(), failureReason);
// 如果设备不存在,断开连接
if (failureReason != null && failureReason.contains("设备不存在")) {
log.warn("[handleAuthentication][设备不存在断开连接clientId: {}]", clientId);
socket.close();
}
}
} catch (Exception e) {
@@ -373,7 +393,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
*/
private void cleanupConnection(NetSocket socket) {
try {
// 1. 发送离线消息(如果认证)
// 1. 发送离线消息(如果<EFBFBD><EFBFBD><EFBFBD>认证)
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo != null && connectionInfo.isAuthenticated()) {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
@@ -391,4 +411,26 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
}
}
/**
* 字节数组转十六进制字符串
*
* @param bytes 字节数组
* @param limit 最大长度
* @return 十六进制字符串
*/
private String bytesToHex(byte[] bytes, int limit) {
if (bytes == null || bytes.length == 0) {
return "";
}
int length = Math.min(bytes.length, limit);
StringBuilder sb = new StringBuilder(length * 2);
for (int i = 0; i < length; i++) {
sb.append(String.format("%02X", bytes[i]));
}
if (bytes.length > limit) {
sb.append("...");
}
return sb.toString();
}
}

View File

@@ -1,4 +1,4 @@
package com.viewsh.module.ops.environment.integration.handler;
package com.viewsh.module.ops.environment.integration.listener;
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
import com.viewsh.module.ops.core.event.OrderStateChangedEvent;
@@ -9,6 +9,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
/**
* 工牌设备状态事件处理器
@@ -42,8 +44,8 @@ public class BadgeDeviceStatusEventHandler {
*
* @param event 工单状态变更事件
*/
@EventListener
@Async("opsExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async("ops-task-executor")
public void onOrderStateChanged(OrderStateChangedEvent event) {
try {
// 只处理保洁类型的工单
@@ -76,7 +78,7 @@ public class BadgeDeviceStatusEventHandler {
WorkOrderStatusEnum newStatus = event.getNewStatus();
switch (newStatus) {
case ASSIGNED:
case PENDING:
// 工单已分配,不影响设备状态(设备可能还在处理其他任务)
// 设置设备当前工单
badgeDeviceStatusService.setCurrentOrder(deviceId, event.getOrderId());
@@ -171,9 +173,9 @@ public class BadgeDeviceStatusEventHandler {
* 1. 将被打断的设备状态转为 PAUSED
* 2. 记录被打断的工单ID
*
* @param deviceId 被打断的设备ID
* @param deviceId 被打断的设备ID
* @param interruptedOrderId 被打断的工单ID
* @param urgentOrderId P0紧急工单ID
* @param urgentOrderId P0紧急工单ID
*/
public void handleP0Interrupt(Long deviceId, Long interruptedOrderId, Long urgentOrderId) {
log.info("[BadgeDeviceStatusEventHandler] P0紧急任务打断: deviceId={}, interruptedOrderId={}, urgentOrderId={}",

View File

@@ -84,31 +84,7 @@ public class OpsOrderServiceTest {
// 模拟 MyBatis Plus 的 ID 自动回填
if (order.getId() == null) {
order.setId(1L);
@Test
void testAssignOrder_FromQueuedStatus_Success() {
// Given
testOrder.setStatus(WorkOrderStatusEnum.QUEUED.getStatus());
OpsOrderAssignReqDTO assignReq = new OpsOrderAssignReqDTO();
assignReq.setOrderId(1L);
assignReq.setAssigneeId(2001L);
assignReq.setRemark("排队后分配");
when(opsOrderMapper.selectById(1L)).thenReturn(testOrder);
// When
assertDoesNotThrow(() -> opsOrderService.assignOrder(assignReq, OperatorTypeEnum.SYSTEM, null));
// Then
verify(orderStateMachine, times(1)).transition(
eq(testOrder),
eq(WorkOrderStatusEnum.DISPATCHED),
eq(OperatorTypeEnum.SYSTEM),
eq(null),
eq("排队后分配")
);
assertEquals(2001L, testOrder.getAssigneeId());
}
}
}
return 1;
});