合并主线

This commit is contained in:
648540858
2024-04-12 22:15:26 +08:00
parent 9f3cab6486
commit 6729df4992
2 changed files with 98 additions and 31 deletions

View File

@@ -13,6 +13,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
/**
* 实时消息上报
*
@@ -28,6 +32,9 @@ public class J0200 extends Re {
@Override
protected Rs decode0(ByteBuf buf, Header header, Session session) {
positionInfo = new JTPositionBaseInfo();
int alarmSignInt = buf.readInt();
positionInfo.setAlarmSign(new JTAlarmSign(alarmSignInt));
@@ -43,9 +50,17 @@ public class J0200 extends Re {
byte[] timeBytes = new byte[6];
buf.readBytes(timeBytes);
positionInfo.setTime(BCDUtil.transform(timeBytes));
JTPositionAdditionalInfo positionAdditionalInfo = new JTPositionAdditionalInfo();
Map<Integer, byte[]> additionalMsg = new HashMap<>();
getAdditionalMsg(buf, positionAdditionalInfo);
// boolean readable = buf.isReadable();
// // 读取附加信息
// if (buf.isReadable()) {
// byte aByte = buf.getByte(0);
// int msgId = (aByte & 0xFF);
//
// // 支持1078的视频报警上报
// int alarm = buf.readInt();
// int loss = buf.readInt();
@@ -59,6 +74,59 @@ public class J0200 extends Re {
return null;
}
private void getAdditionalMsg(ByteBuf buf, JTPositionAdditionalInfo additionalInfo) {
if (buf.isReadable()) {
int msgId = buf.readUnsignedByte();
int length = buf.readUnsignedByte();
ByteBuf byteBuf = buf.readBytes(length);
switch (msgId) {
case 1:
// 里程
long mileage = byteBuf.readUnsignedInt();
log.info("[JT-位置汇报]: 里程: {} km", (double)mileage/10);
break;
case 2:
// 油量
int oil = byteBuf.readUnsignedShort();
log.info("[JT-位置汇报]: 油量: {} L", (double)oil/10);
break;
case 3:
// 速度
int speed = byteBuf.readUnsignedShort();
log.info("[JT-位置汇报]: 速度: {} km/h", (double)speed/10);
break;
case 4:
// 需要人工确认报警事件的 ID
int alarmId = byteBuf.readUnsignedShort();
log.info("[JT-位置汇报]: 需要人工确认报警事件的 ID {}", alarmId);
break;
case 5:
byte[] tirePressureBytes = new byte[30];
// 胎压
byteBuf.readBytes(tirePressureBytes);
log.info("[JT-位置汇报]: 胎压 {}", tirePressureBytes);
break;
case 6:
// 车厢温度
short carriageTemperature = byteBuf.readShort();
log.info("[JT-位置汇报]: 车厢温度 {}摄氏度", carriageTemperature);
break;
case 11:
// 超速报警
short positionType = byteBuf.readUnsignedByte();
long positionId = byteBuf.readUnsignedInt();
log.info("[JT-位置汇报]: 超速报警, 位置类型: {}, 区域或路段 ID: {}", positionType, positionId);
break;
default:
log.info("[JT-位置汇报]: 附加消息ID {} 消息长度: {}", msgId, length);
break;
}
getAdditionalMsg(buf, additionalInfo);
}
}
@Override
protected Rs handler(Header header, Session session, Ijt1078Service service) {
JTDevice deviceInDb = service.getDevice(header.getTerminalId());

View File

@@ -14,14 +14,15 @@ import com.genersoft.iot.vmp.jt1078.event.CallbackManager;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -54,7 +55,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private ZlmHttpHookSubscribe subscribe;
private HookSubscribe subscribe;
@Autowired
private IMediaServerService mediaServerService;
@@ -122,10 +123,10 @@ public class jt1078ServiceImpl implements Ijt1078Service {
StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
String mediaServerId = streamInfo.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, "rtp", "rtsp", streamInfo.getStream());
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream());
if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) {
Boolean online = mediaInfo.getBoolean("online");
if (online != null && online) {
@@ -141,25 +142,24 @@ public class jt1078ServiceImpl implements Ijt1078Service {
redisTemplate.delete(playKey);
}
String stream = deviceId + "-" + channelId;
MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServerItem == null) {
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 设置hook监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
logger.info("[1078-点播] 点播成功, deviceId {} channelId {}", deviceId, channelId);
StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId);
StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hookSubscribe);
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
});
// 设置超时监听
@@ -173,11 +173,11 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 1);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false,1);
logger.info("[1078-点播] deviceId {} channelId {} 端口: {}", deviceId, channelId, ssrcInfo.getPort());
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
j9101.setIp(mediaServerItem.getSdpIp());
j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1);
j9101.setTcpPort(ssrcInfo.getPort());
j9101.setUdpPort(ssrcInfo.getPort());
@@ -187,8 +187,8 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String deviceId, String channelId) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookData.getStream(), hookData.getMediaInfo(), null);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);
return streamInfo;
@@ -293,10 +293,10 @@ public class jt1078ServiceImpl implements Ijt1078Service {
StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playbackKey);
if (streamInfo != null) {
String mediaServerId = streamInfo.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, "rtp", "rtsp", streamInfo.getStream());
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream());
if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) {
Boolean online = mediaInfo.getBoolean("online");
if (online != null && online) {
@@ -314,20 +314,19 @@ public class jt1078ServiceImpl implements Ijt1078Service {
String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime);
String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime);
String stream = deviceId + "-" + channelId + "-" + startTimeParam + "-" + endTimeParam;
MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServerItem == null) {
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 设置hook监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hookSubscribe, (hookData) -> {
dynamicTask.stop(playbackKey);
logger.info("[1078-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId);
StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
@@ -346,11 +345,11 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 1);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false, 1);
logger.info("[1078-回放] logInfo {} 端口: {}", logInfo, ssrcInfo.getPort());
J9201 j9201 = new J9201();
j9201.setChannel(Integer.parseInt(channelId));
j9201.setIp(mediaServerItem.getSdpIp());
j9201.setIp(mediaServer.getSdpIp());
j9201.setRate(0);
j9201.setPlaybackType(0);
j9201.setPlaybackSpeed(0);