From 6729df4992f9d991ae6ae8624a8b9e6ddf1c333a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 12 Apr 2024 22:15:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6=E4=B8=BB=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/jt1078/proc/request/J0200.java | 68 +++++++++++++++++++ .../service/impl/jt1078ServiceImpl.java | 61 ++++++++--------- 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java index 8efd29d83..9125dcf8e 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java @@ -13,6 +13,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEvent; +import java.text.DecimalFormat; +import java.util.HashMap; +import java.util.Map; + /** * 实时消息上报 * @@ -28,6 +32,9 @@ public class J0200 extends Re { @Override protected Rs decode0(ByteBuf buf, Header header, Session session) { + + + positionInfo = new JTPositionBaseInfo(); int alarmSignInt = buf.readInt(); positionInfo.setAlarmSign(new JTAlarmSign(alarmSignInt)); @@ -43,9 +50,17 @@ public class J0200 extends Re { byte[] timeBytes = new byte[6]; buf.readBytes(timeBytes); positionInfo.setTime(BCDUtil.transform(timeBytes)); + + JTPositionAdditionalInfo positionAdditionalInfo = new JTPositionAdditionalInfo(); + Map additionalMsg = new HashMap<>(); + getAdditionalMsg(buf, positionAdditionalInfo); + // boolean readable = buf.isReadable(); // // 读取附加信息 // if (buf.isReadable()) { +// byte aByte = buf.getByte(0); +// int msgId = (aByte & 0xFF); +// // // 支持1078的视频报警上报 // int alarm = buf.readInt(); // int loss = buf.readInt(); @@ -59,6 +74,59 @@ public class J0200 extends Re { return null; } + private void getAdditionalMsg(ByteBuf buf, JTPositionAdditionalInfo additionalInfo) { + + if (buf.isReadable()) { + int msgId = buf.readUnsignedByte(); + int length = buf.readUnsignedByte(); + ByteBuf byteBuf = buf.readBytes(length); + switch (msgId) { + case 1: + // 里程 + long mileage = byteBuf.readUnsignedInt(); + log.info("[JT-位置汇报]: 里程: {} km", (double)mileage/10); + break; + case 2: + // 油量 + int oil = byteBuf.readUnsignedShort(); + log.info("[JT-位置汇报]: 油量: {} L", (double)oil/10); + break; + case 3: + // 速度 + int speed = byteBuf.readUnsignedShort(); + log.info("[JT-位置汇报]: 速度: {} km/h", (double)speed/10); + break; + case 4: + // 需要人工确认报警事件的 ID + int alarmId = byteBuf.readUnsignedShort(); + log.info("[JT-位置汇报]: 需要人工确认报警事件的 ID: {}", alarmId); + break; + case 5: + byte[] tirePressureBytes = new byte[30]; + // 胎压 + byteBuf.readBytes(tirePressureBytes); + log.info("[JT-位置汇报]: 胎压 {}", tirePressureBytes); + break; + case 6: + // 车厢温度 + short carriageTemperature = byteBuf.readShort(); + log.info("[JT-位置汇报]: 车厢温度 {}摄氏度", carriageTemperature); + break; + case 11: + // 超速报警 + short positionType = byteBuf.readUnsignedByte(); + long positionId = byteBuf.readUnsignedInt(); + log.info("[JT-位置汇报]: 超速报警, 位置类型: {}, 区域或路段 ID: {}", positionType, positionId); + break; + default: + log.info("[JT-位置汇报]: 附加消息ID: {}, 消息长度: {}", msgId, length); + break; + + } + getAdditionalMsg(buf, additionalInfo); + } + } + @Override protected Rs handler(Header header, Session session, Ijt1078Service service) { JTDevice deviceInDb = service.getDevice(header.getTerminalId()); diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index cc391783d..0fc0e0b4b 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -14,14 +14,15 @@ import com.genersoft.iot.vmp.jt1078.event.CallbackManager; import com.genersoft.iot.vmp.jt1078.proc.request.J1205; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -54,7 +55,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { private RedisTemplate redisTemplate; @Autowired - private ZlmHttpHookSubscribe subscribe; + private HookSubscribe subscribe; @Autowired private IMediaServerService mediaServerService; @@ -122,10 +123,10 @@ public class jt1078ServiceImpl implements Ijt1078Service { StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey); if (streamInfo != null) { String mediaServerId = streamInfo.getMediaServerId(); - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem != null) { + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + if (mediaServer != null) { // 查询流是否存在,不存在则删除缓存数据 - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, "rtp", "rtsp", streamInfo.getStream()); + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream()); if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) { Boolean online = mediaInfo.getBoolean("online"); if (online != null && online) { @@ -141,25 +142,24 @@ public class jt1078ServiceImpl implements Ijt1078Service { redisTemplate.delete(playKey); } String stream = deviceId + "-" + channelId; - MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); - if (mediaServerItem == null) { + MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + if (mediaServer == null) { for (GeneralCallback errorCallback : errorCallbacks) { errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); } return; } // 设置hook监听 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); + subscribe.addSubscribe(hook, (hookData) -> { dynamicTask.stop(playKey); logger.info("[1078-点播] 点播成功, deviceId: {}, channelId: {}", deviceId, channelId); - StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId); + StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId); for (GeneralCallback errorCallback : errorCallbacks) { errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); } - subscribe.removeSubscribe(hookSubscribe); + subscribe.removeSubscribe(hook); redisTemplate.opsForValue().set(playKey, info); }); // 设置超时监听 @@ -173,11 +173,11 @@ public class jt1078ServiceImpl implements Ijt1078Service { }, userSetting.getPlayTimeout()); // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 1); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false,1); logger.info("[1078-点播] deviceId: {}, channelId: {}, 端口: {}", deviceId, channelId, ssrcInfo.getPort()); J9101 j9101 = new J9101(); j9101.setChannel(Integer.valueOf(channelId)); - j9101.setIp(mediaServerItem.getSdpIp()); + j9101.setIp(mediaServer.getSdpIp()); j9101.setRate(1); j9101.setTcpPort(ssrcInfo.getPort()); j9101.setUdpPort(ssrcInfo.getPort()); @@ -187,8 +187,8 @@ public class jt1078ServiceImpl implements Ijt1078Service { } - public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null); + public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String deviceId, String channelId) { + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookData.getStream(), hookData.getMediaInfo(), null); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); return streamInfo; @@ -293,10 +293,10 @@ public class jt1078ServiceImpl implements Ijt1078Service { StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playbackKey); if (streamInfo != null) { String mediaServerId = streamInfo.getMediaServerId(); - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem != null) { + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + if (mediaServer != null) { // 查询流是否存在,不存在则删除缓存数据 - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, "rtp", "rtsp", streamInfo.getStream()); + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream()); if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) { Boolean online = mediaInfo.getBoolean("online"); if (online != null && online) { @@ -314,20 +314,19 @@ public class jt1078ServiceImpl implements Ijt1078Service { String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime); String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime); String stream = deviceId + "-" + channelId + "-" + startTimeParam + "-" + endTimeParam; - MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); - if (mediaServerItem == null) { + MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + if (mediaServer == null) { for (GeneralCallback errorCallback : errorCallbacks) { errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); } return; } // 设置hook监听 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); + subscribe.addSubscribe(hookSubscribe, (hookData) -> { dynamicTask.stop(playbackKey); logger.info("[1078-回放] 回放成功, logInfo: {}", logInfo); - StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId); + StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId); for (GeneralCallback errorCallback : errorCallbacks) { errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); @@ -346,11 +345,11 @@ public class jt1078ServiceImpl implements Ijt1078Service { }, userSetting.getPlayTimeout()); // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 1); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false, 1); logger.info("[1078-回放] logInfo: {}, 端口: {}", logInfo, ssrcInfo.getPort()); J9201 j9201 = new J9201(); j9201.setChannel(Integer.parseInt(channelId)); - j9201.setIp(mediaServerItem.getSdpIp()); + j9201.setIp(mediaServer.getSdpIp()); j9201.setRate(0); j9201.setPlaybackType(0); j9201.setPlaybackSpeed(0);