Compare commits
4 Commits
447ea8fe90
...
c88dc3fc10
| Author | SHA1 | Date | |
|---|---|---|---|
| c88dc3fc10 | |||
| 5dab9622d6 | |||
| 7c7b122de3 | |||
| eb5513f9b6 |
@@ -90,11 +90,20 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
// 2. 解析为 JT808 数据包
|
||||
Jt808DataPack dataPack = decoder.bytes2PackageData(unescapedBytes);
|
||||
|
||||
// 3. 转换为统一的 IotDeviceMessage
|
||||
// 3. 检查消息体是否有效(null 表示长度不匹配错误,空数组表示合法的空消息体如心跳)
|
||||
if (dataPack.getBodyBytes() == null) {
|
||||
// 长度不匹配,返回 null 由上层跳过该消息
|
||||
log.warn("[decode][消息体长度不匹配,跳过该消息,消息ID=0x{}]",
|
||||
Integer.toHexString(dataPack.getPackHead().getId()));
|
||||
return null;
|
||||
}
|
||||
|
||||
// 4. 转换为统一的 IotDeviceMessage(包括空消息体的消息如心跳)
|
||||
return convertToIotDeviceMessage(dataPack);
|
||||
} catch (Exception e) {
|
||||
log.error("[decode][JT808 消息解码失败,数据长度: {}]", bytes.length, e);
|
||||
throw new RuntimeException("JT808 消息解码失败: " + e.getMessage(), e);
|
||||
// 解码失败返回 null,不断开连接
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,7 +217,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
*
|
||||
* 物模型标准格式:
|
||||
* {
|
||||
* "eventId": "button_event",
|
||||
* "identifier": "button_event",
|
||||
* "eventTime": 1234567890,
|
||||
* "params": {
|
||||
* "keyId": 1,
|
||||
@@ -225,7 +234,8 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
|
||||
// 统一使用一个事件标识符,通过 isLongPress 参数区分短按和长按
|
||||
result.put("eventId", "button_event");
|
||||
// 注意:必须使用 "identifier" 字段,以便 IotDeviceMessageUtils.getIdentifier() 正确提取
|
||||
result.put("identifier", "button_event");
|
||||
|
||||
// 事件时间戳
|
||||
result.put("eventTime", System.currentTimeMillis());
|
||||
|
||||
@@ -43,6 +43,29 @@ public class Jt808Decoder {
|
||||
msgBodyByteStartIndex = 16;
|
||||
}
|
||||
|
||||
// 验证消息体长度,防止越界
|
||||
int expectedBodyLength = msgHeader.getBodyLength();
|
||||
int actualAvailableLength = data.length - msgBodyByteStartIndex - 1; // -1 for checksum
|
||||
|
||||
// 检查数据长度是否足够(至少要包含消息头和校验码)
|
||||
if (data.length < msgBodyByteStartIndex + 1) {
|
||||
log.warn("[bytes2PackageData][数据长度不足: 头部需要={}, 实际={}, 消息ID=0x{}]",
|
||||
msgBodyByteStartIndex + 1, data.length, Integer.toHexString(msgHeader.getId()));
|
||||
ret.setBodyBytes(null); // 使用 null 标记错误状态
|
||||
ret.setCheckSum(0);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (expectedBodyLength > actualAvailableLength) {
|
||||
log.warn("[bytes2PackageData][消息体长度不匹配: 头部声明={}, 实际可用={}, 数据总长度={}, 消息ID=0x{}]",
|
||||
expectedBodyLength, actualAvailableLength, data.length,
|
||||
Integer.toHexString(msgHeader.getId()));
|
||||
// 使用 null 标记长度不匹配(区别于合法的空消息体如心跳)
|
||||
ret.setBodyBytes(null);
|
||||
ret.setCheckSum(data[data.length - 1]);
|
||||
return ret;
|
||||
}
|
||||
|
||||
byte[] bodyBytes = new byte[msgHeader.getBodyLength()];
|
||||
System.arraycopy(data, msgBodyByteStartIndex, bodyBytes, 0, bodyBytes.length);
|
||||
ret.setBodyBytes(bodyBytes);
|
||||
|
||||
@@ -113,7 +113,11 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
|
||||
String codecType, NetSocket socket,
|
||||
String productKey, String deviceName, String serverId) {
|
||||
try {
|
||||
// 1. 发送消息到消息总线
|
||||
// 1. 标记消息为"协议层已应答",业务层无需再发送 reply
|
||||
// 设置 code = 0 后,isReplyMessage() 返回 true,业务层跳过 reply
|
||||
message.setCode(0);
|
||||
|
||||
// 2. 发送消息到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
|
||||
|
||||
log.info("[handleBusinessMessage][JT808 业务消息已发送,clientId: {}, method: {}, messageId: {}]",
|
||||
|
||||
@@ -112,17 +112,30 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
// 2. 获取消息格式类型
|
||||
String codecType = getMessageCodecType(buffer, socket);
|
||||
if (codecType == null) {
|
||||
// 无法识别消息格式,记录警告但不断开连接
|
||||
byte[] data = buffer.getBytes();
|
||||
String preview = bytesToHex(data, Math.min(32, data.length));
|
||||
log.warn("[processMessage][无法识别消息格式,跳过该消息,clientId: {}, 数据前32字节: {}]",
|
||||
clientId, preview);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 解码消息
|
||||
IotDeviceMessage message;
|
||||
try {
|
||||
message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
|
||||
if (message == null) {
|
||||
throw new Exception("解码后消息为空");
|
||||
// 解码失败(如消息格式错误),记录警告但不断开连接
|
||||
log.warn("[processMessage][消息解码失败,跳过该消息,clientId: {}, codecType: {}]",
|
||||
clientId, codecType);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 消息格式错误时抛出异常,由上层处理连接断开
|
||||
throw new Exception("消息解码失败: " + e.getMessage(), e);
|
||||
// 其他异常也记录警告但不断开连接
|
||||
log.warn("[processMessage][消息解码异常,跳过该消息,clientId: {}, codecType: {}, 错误: {}]",
|
||||
clientId, codecType, e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 查找协议处理器
|
||||
@@ -147,7 +160,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
* 认证结果处理:
|
||||
* - SUCCESS:注册连接,发送上线消息
|
||||
* - PENDING:等待后续认证步骤(如 JT808 注册后等待鉴权)
|
||||
* - FAILURE:认证失败,不做处理(协议处理器已发送失败响应)
|
||||
* - FAILURE:认证失败,如果设备不存在则断开连接,否则只记录日志
|
||||
*
|
||||
* @param clientId 客户端 ID
|
||||
* @param message 认证消息
|
||||
@@ -178,9 +191,16 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
handler.getProtocolType(), result.getMessage());
|
||||
|
||||
} else {
|
||||
// 认证失败:协议处理器已发送失败响应,这里只记录日志
|
||||
// 认证失败
|
||||
String failureReason = result.getMessage();
|
||||
log.warn("[handleAuthentication][认证失败,clientId: {}, 协议: {}, 原因: {}]",
|
||||
clientId, handler.getProtocolType(), result.getMessage());
|
||||
clientId, handler.getProtocolType(), failureReason);
|
||||
|
||||
// 如果设备不存在,断开连接
|
||||
if (failureReason != null && failureReason.contains("设备不存在")) {
|
||||
log.warn("[handleAuthentication][设备不存在,断开连接,clientId: {}]", clientId);
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
@@ -373,7 +393,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
*/
|
||||
private void cleanupConnection(NetSocket socket) {
|
||||
try {
|
||||
// 1. 发送离线消息(如果已认证)
|
||||
// 1. 发送离线消息(如果<EFBFBD><EFBFBD><EFBFBD>认证)
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
|
||||
if (connectionInfo != null && connectionInfo.isAuthenticated()) {
|
||||
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
|
||||
@@ -391,4 +411,26 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 字节数组转十六进制字符串
|
||||
*
|
||||
* @param bytes 字节数组
|
||||
* @param limit 最大长度
|
||||
* @return 十六进制字符串
|
||||
*/
|
||||
private String bytesToHex(byte[] bytes, int limit) {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return "";
|
||||
}
|
||||
int length = Math.min(bytes.length, limit);
|
||||
StringBuilder sb = new StringBuilder(length * 2);
|
||||
for (int i = 0; i < length; i++) {
|
||||
sb.append(String.format("%02X", bytes[i]));
|
||||
}
|
||||
if (bytes.length > limit) {
|
||||
sb.append("...");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.viewsh.module.ops.environment.integration.handler;
|
||||
package com.viewsh.module.ops.environment.integration.listener;
|
||||
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||
import com.viewsh.module.ops.core.event.OrderStateChangedEvent;
|
||||
@@ -9,6 +9,8 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
|
||||
/**
|
||||
* 工牌设备状态事件处理器
|
||||
@@ -42,8 +44,8 @@ public class BadgeDeviceStatusEventHandler {
|
||||
*
|
||||
* @param event 工单状态变更事件
|
||||
*/
|
||||
@EventListener
|
||||
@Async("opsExecutor")
|
||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
|
||||
@Async("ops-task-executor")
|
||||
public void onOrderStateChanged(OrderStateChangedEvent event) {
|
||||
try {
|
||||
// 只处理保洁类型的工单
|
||||
@@ -76,7 +78,7 @@ public class BadgeDeviceStatusEventHandler {
|
||||
WorkOrderStatusEnum newStatus = event.getNewStatus();
|
||||
|
||||
switch (newStatus) {
|
||||
case ASSIGNED:
|
||||
case PENDING:
|
||||
// 工单已分配,不影响设备状态(设备可能还在处理其他任务)
|
||||
// 设置设备当前工单
|
||||
badgeDeviceStatusService.setCurrentOrder(deviceId, event.getOrderId());
|
||||
@@ -171,9 +173,9 @@ public class BadgeDeviceStatusEventHandler {
|
||||
* 1. 将被打断的设备状态转为 PAUSED
|
||||
* 2. 记录被打断的工单ID
|
||||
*
|
||||
* @param deviceId 被打断的设备ID
|
||||
* @param deviceId 被打断的设备ID
|
||||
* @param interruptedOrderId 被打断的工单ID
|
||||
* @param urgentOrderId P0紧急工单ID
|
||||
* @param urgentOrderId P0紧急工单ID
|
||||
*/
|
||||
public void handleP0Interrupt(Long deviceId, Long interruptedOrderId, Long urgentOrderId) {
|
||||
log.info("[BadgeDeviceStatusEventHandler] P0紧急任务打断: deviceId={}, interruptedOrderId={}, urgentOrderId={}",
|
||||
|
||||
@@ -84,31 +84,7 @@ public class OpsOrderServiceTest {
|
||||
// 模拟 MyBatis Plus 的 ID 自动回填
|
||||
if (order.getId() == null) {
|
||||
order.setId(1L);
|
||||
@Test
|
||||
void testAssignOrder_FromQueuedStatus_Success() {
|
||||
// Given
|
||||
testOrder.setStatus(WorkOrderStatusEnum.QUEUED.getStatus());
|
||||
OpsOrderAssignReqDTO assignReq = new OpsOrderAssignReqDTO();
|
||||
assignReq.setOrderId(1L);
|
||||
assignReq.setAssigneeId(2001L);
|
||||
assignReq.setRemark("排队后分配");
|
||||
|
||||
when(opsOrderMapper.selectById(1L)).thenReturn(testOrder);
|
||||
|
||||
// When
|
||||
assertDoesNotThrow(() -> opsOrderService.assignOrder(assignReq, OperatorTypeEnum.SYSTEM, null));
|
||||
|
||||
// Then
|
||||
verify(orderStateMachine, times(1)).transition(
|
||||
eq(testOrder),
|
||||
eq(WorkOrderStatusEnum.DISPATCHED),
|
||||
eq(OperatorTypeEnum.SYSTEM),
|
||||
eq(null),
|
||||
eq("排队后分配")
|
||||
);
|
||||
assertEquals(2001L, testOrder.getAssigneeId());
|
||||
}
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user