dialog去除以及异常情况处理优化

This commit is contained in:
648540858
2022-09-23 22:45:23 +08:00
parent a574ff0944
commit cd117ed228
53 changed files with 3106 additions and 3053 deletions

View File

@@ -26,6 +26,9 @@ import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@@ -95,7 +98,11 @@ public class DeviceServiceImpl implements IDeviceService {
logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
deviceMapper.add(device);
redisCatchStorage.updateDevice(device);
commander.deviceInfoQuery(device);
try {
commander.deviceInfoQuery(device);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
}else {
if(device.getOnline() == 0){
@@ -104,7 +111,11 @@ public class DeviceServiceImpl implements IDeviceService {
logger.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId());
deviceMapper.update(device);
redisCatchStorage.updateDevice(device);
commander.deviceInfoQuery(device);
try {
commander.deviceInfoQuery(device);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
sync(device);
// TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台
}else {
@@ -129,6 +140,7 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public void offline(String deviceId) {
logger.info("[设备离线] device{}", deviceId);
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
return;
@@ -238,15 +250,28 @@ public class DeviceServiceImpl implements IDeviceService {
}
int sn = (int)((Math.random()*9+1)*100000);
catalogResponseMessageHandler.setChannelSyncReady(device, sn);
sipCommander.catalogQuery(device, sn, event -> {
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
try {
sipCommander.catalogQuery(device, sn, event -> {
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[同步通道], 信令发送失败:{}", e.getMessage() );
String errorMsg = String.format("同步通道失败,信令发送失败: %s", e.getMessage());
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
});
}
}
@Override
public Device queryDevice(String deviceId) {
return deviceMapper.getDeviceByDeviceId(deviceId);
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device != null) {
redisCatchStorage.updateDevice(device);
}
}
return device;
}
@Override
@@ -266,7 +291,11 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getOnline() == 0) {
return;
}
sipCommander.deviceStatusQuery(device, null);
try {
sipCommander.deviceStatusQuery(device, null);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 设备状态查询: {}", e.getMessage());
}
}

View File

@@ -20,7 +20,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.TimeoutEvent;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -99,9 +102,13 @@ public class PlatformServiceImpl implements IPlatformService {
if (parentPlatform.isEnable()) {
// 保存时启用就发送注册
// 注册成功时由程序直接调用了online方法
commanderForPlatform.register(parentPlatform, eventResult -> {
logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
}, null);
try {
commanderForPlatform.register(parentPlatform, eventResult -> {
logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId());
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
}
}
return result > 0;
}
@@ -130,46 +137,62 @@ public class PlatformServiceImpl implements IPlatformService {
// 添加注册任务
dynamicTask.startDelay(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()->commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null),
()-> {
try {
commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
}
},
(parentPlatform.getExpires() - 10) *1000);
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.contains(keepaliveTaskKey)) {
// 添加心跳任务
dynamicTask.startCron(keepaliveTaskKey,
()-> commanderForPlatform.keepalive(parentPlatform, eventResult -> {
// 心跳失败
if (eventResult.type == SipSubscribe.EventResultType.timeout) {
// 心跳超时
ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
// 此时是第三次心跳超时, 平台离线
if (platformCatch.getKeepAliveReply() == 2) {
// 设置平台离线,并重新注册
offline(parentPlatform);
logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId());
commanderForPlatform.register(parentPlatform, eventResult1 -> {
logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败开始定时发起注册间隔为1分钟", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
60*1000);
}, null);
}
()-> {
try {
commanderForPlatform.keepalive(parentPlatform, eventResult -> {
// 心跳失败
if (eventResult.type == SipSubscribe.EventResultType.timeout) {
// 心跳超时
ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
// 此时是第三次心跳超时, 平台离线
if (platformCatch.getKeepAliveReply() == 2) {
// 设置平台离线,并重新注册
offline(parentPlatform);
logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId());
try {
commanderForPlatform.register(parentPlatform, eventResult1 -> {
logger.info("[国标级联] {}三次心跳超时后再次发起注册仍然失败开始定时发起注册间隔为1分钟", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
60*1000);
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联 注册: {}", e.getMessage());
}
}
}else {
logger.warn("[国标级联]发送心跳收到错误code {}, msg: {}", eventResult.statusCode, eventResult.msg);
}
}else {
logger.warn("[国标级联]发送心跳收到错误code {}, msg: {}", eventResult.statusCode, eventResult.msg);
}
}, eventResult -> {
// 心跳成功
// 清空之前的心跳超时计数
ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
if (platformCatch.getKeepAliveReply() > 0) {
platformCatch.setKeepAliveReply(0);
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
}, eventResult -> {
// 心跳成功
// 清空之前的心跳超时计数
ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
if (platformCatch.getKeepAliveReply() > 0) {
platformCatch.setKeepAliveReply(0);
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
}
});
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());
}
}),
},
(parentPlatform.getKeepTimeout() - 10)*1000);
}
}
@@ -225,14 +248,18 @@ public class PlatformServiceImpl implements IPlatformService {
@Override
public void login(ParentPlatform parentPlatform) {
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
commanderForPlatform.register(parentPlatform, eventResult1 -> {
logger.info("[国标级联] {}开始定时发起注册间隔为1分钟", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()->logger.info("[国标级联] {},平台离线后持续发起注册失败", parentPlatform.getServerGBId()),
60*1000);
}, null);
try {
commanderForPlatform.register(parentPlatform, eventResult1 -> {
logger.info("[国标级联] {}开始定时发起注册间隔为1分钟", parentPlatform.getServerGBId());
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),
60*1000);
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联注册: {}", e.getMessage());
}
}
@Override
@@ -259,7 +286,12 @@ public class PlatformServiceImpl implements IPlatformService {
continue;
}
// 发送GPS消息
commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
try {
commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
logger.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage());
}
}
}
}

View File

@@ -2,11 +2,17 @@ package com.genersoft.iot.vmp.service.impl;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
import java.util.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,13 +28,6 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
@@ -92,6 +91,10 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private VideoStreamSessionManager streamSession;
@Autowired
private IDeviceService deviceService;
@Autowired
private UserSetting userSetting;
@@ -261,14 +264,14 @@ public class PlayServiceImpl implements IPlayService {
System.out.println("设置超时任务: " + timeOutTaskKey);
dynamicTask.startDelay( timeOutTaskKey,()->{
SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
if (dialog != null) {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc());
timeoutCallback.run(1, "收流超时");
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
}else {
logger.info("[点播超时] 消息未响应 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc());
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc());
timeoutCallback.run(1, "收流超时");
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
timeoutCallback.run(0, "点播超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
@@ -282,73 +285,87 @@ public class PlayServiceImpl implements IPlayService {
logger.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
return;
}
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
System.out.println("停止超时任务 " + timeOutTaskKey);
dynamicTask.stop(timeOutTaskKey);
// hook响应
onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
hookEvent.response(mediaServerItemInuse, response);
logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息 " + response.toJSONString());
System.out.println("停止超时任务: " + timeOutTaskKey);
dynamicTask.stop(timeOutTaskKey);
// hook响应
onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
hookEvent.response(mediaServerItemInuse, response);
logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
}, (event) -> {
ResponseEvent responseEvent = (ResponseEvent)event.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrc.equals(ssrcInResponse)) {
return;
}
logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
// ssrc 不可用
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
event.msg = "下级自定义了ssrc,但是此ssrc不可用";
event.statusCode = 400;
errorEvent.response(event);
}, (event) -> {
ResponseEvent responseEvent = (ResponseEvent)event.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrc.equals(ssrcInResponse)) {
return;
}
logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
// ssrc 不可用
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
event.msg = "下级自定义了ssrc,但是此ssrc不可用";
event.statusCode = 400;
errorEvent.response(event);
return;
}
// 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// hook响应
onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
hookEvent.response(mediaServerItemInUse, response);
});
}
// 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
// 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// hook响应
onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
hookEvent.response(mediaServerItemInUse, response);
});
}
// 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
}
}
}, (event) -> {
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
errorEvent.response(event);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
errorEvent.response(event);
});
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
eventResult.msg = "命令发送失败";
errorEvent.response(eventResult);
}
}
@Override
@@ -439,17 +456,18 @@ public class PlayServiceImpl implements IPlayService {
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg("回放超时");
playBackResult.setData(requestMessage);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
if (dialog != null) {
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[录像流]回放超时 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
}else {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
// 回复之前所有的点播请求
playBackCallback.call(playBackResult);
result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "回放超时"));
@@ -489,59 +507,67 @@ public class PlayServiceImpl implements IPlayService {
playBackCallback.call(playBackResult);
};
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
hookEvent, eventResult -> {
if (eventResult.type == SipSubscribe.EventResultType.response) {
ResponseEvent responseEvent = (ResponseEvent)eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
return;
}
logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
// ssrc 不可用
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
eventResult.statusCode = 400;
errorEvent.response(eventResult);
try {
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
hookEvent, eventResult -> {
if (eventResult.type == SipSubscribe.EventResultType.response) {
ResponseEvent responseEvent = (ResponseEvent)eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
return;
}
logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
// hook响应
onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
});
if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
// ssrc 不可用
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
eventResult.statusCode = 400;
errorEvent.response(eventResult);
return;
}
// 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
// hook响应
onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
});
}
// 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
}
// 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
}
}
}
}, errorEvent);
}, errorEvent);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 回放: {}", e.getMessage());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
eventResult.msg = "命令发送失败";
errorEvent.response(eventResult);
}
return result;
}
@@ -587,46 +613,57 @@ public class PlayServiceImpl implements IPlayService {
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg("录像下载请求超时");
hookCallBack.call(downloadResult);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
if (dialog != null) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
}else {
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
// 回复之前所有的点播请求
hookCallBack.call(downloadResult);
}, userSetting.getPlayTimeout());
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
inviteStreamInfo -> {
logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
dynamicTask.stop(downLoadTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
wvpResult.setData(streamInfo);
downloadResult.setCode(ErrorCode.SUCCESS.getCode());
downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
downloadResult.setResponse(inviteStreamInfo.getResponse());
hookCallBack.call(downloadResult);
}, event -> {
dynamicTask.stop(downLoadTimeOutTaskKey);
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
downloadResult.setEvent(event);
hookCallBack.call(downloadResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
});
SipSubscribe.Event errorEvent = event -> {
dynamicTask.stop(downLoadTimeOutTaskKey);
downloadResult.setCode(ErrorCode.ERROR100.getCode());
downloadResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
downloadResult.setEvent(event);
hookCallBack.call(downloadResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
};
try {
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
inviteStreamInfo -> {
logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
dynamicTask.stop(downLoadTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
wvpResult.setData(streamInfo);
downloadResult.setCode(ErrorCode.SUCCESS.getCode());
downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
downloadResult.setResponse(inviteStreamInfo.getResponse());
hookCallBack.call(downloadResult);
}, errorEvent);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
eventResult.msg = "命令发送失败";
errorEvent.response(eventResult);
}
return result;
}
@@ -705,7 +742,11 @@ public class PlayServiceImpl implements IPlayService {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
try {
sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}
}
}
@@ -714,8 +755,17 @@ public class PlayServiceImpl implements IPlayService {
if (allSsrc.size() > 0) {
for (SsrcTransaction ssrcTransaction : allSsrc) {
if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
ssrcTransaction.getStream(), null);
Device device = deviceService.queryDevice(ssrcTransaction.getDeviceId());
if (device == null) {
continue;
}
try {
cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
ssrcTransaction.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[zlm离线]为正在使用此zlm的设备 发送BYE失败 {}", e.getMessage());
}
}
}
}

View File

@@ -16,7 +16,9 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.validation.constraints.NotNull;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -78,16 +80,28 @@ public class RedisAlarmMsgListener implements MessageListener {
List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
if (parentPlatforms.size() > 0) {
for (ParentPlatform parentPlatform : parentPlatforms) {
commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
try {
commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
}
}
}
}else {
Device device = storage.queryVideoDevice(gbId);
ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
if (device != null && platform == null) {
commander.sendAlarmMessage(device, deviceAlarm);
try {
commander.sendAlarmMessage(device, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
}else if (device == null && platform != null){
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
try {
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
}else {
logger.warn("无法确定" + gbId + "是平台还是设备");
}