Merge branch 'wvp-28181-2.0' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java
This commit is contained in:
648540858
2024-01-16 14:10:27 +08:00
116 changed files with 9493 additions and 16489 deletions

View File

@@ -148,13 +148,13 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
if (event.getDeviceChannels() != null) {
deviceChannelList.addAll(event.getDeviceChannels());
}
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
if (event.getGbStreams() != null && !event.getGbStreams().isEmpty()){
for (GbStream gbStream : event.getGbStreams()) {
deviceChannelList.add(
gbStreamService.getDeviceChannelListByStreamWithStatus(gbStream, gbStream.getCatalogId(), parentPlatform));
}
}
if (deviceChannelList.size() > 0) {
if (!deviceChannelList.isEmpty()) {
logger.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), event.getPlatformId(), deviceChannelList.size());
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), parentPlatform, deviceChannelList, subscribe, null);
@@ -163,10 +163,10 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
logger.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
}else if (parentPlatformMap.keySet().size() > 0) {
}else if (!parentPlatformMap.keySet().isEmpty()) {
for (String gbId : parentPlatformMap.keySet()) {
List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
if (parentPlatforms != null && parentPlatforms.size() > 0) {
if (parentPlatforms != null && !parentPlatforms.isEmpty()) {
for (ParentPlatform platform : parentPlatforms) {
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (subscribeInfo == null) {

View File

@@ -75,6 +75,33 @@ public class VideoStreamSessionManager {
return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0));
}
public SsrcTransaction getSsrcTransactionByCallId(String callId){
if (ObjectUtils.isEmpty(callId)) {
return null;
}
String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_*_*_" + callId+ "_*";
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (!scanResult.isEmpty()) {
return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0));
}else {
key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_*_*_play_*";
scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.isEmpty()) {
return null;
}
for (Object keyObj : scanResult) {
SsrcTransaction ssrcTransaction = (SsrcTransaction)redisTemplate.opsForValue().get(keyObj);
if (ssrcTransaction.getSipTransactionInfo() != null &&
ssrcTransaction.getSipTransactionInfo().getCallId().equals(callId)) {
return ssrcTransaction;
}
}
return null;
}
}
public List<SsrcTransaction> getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){
if (ObjectUtils.isEmpty(deviceId)) {
deviceId ="*";
@@ -117,8 +144,19 @@ public class VideoStreamSessionManager {
}
public void remove(String deviceId, String channelId, String stream) {
SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream);
if (ssrcTransaction == null) {
List<SsrcTransaction> ssrcTransactionList = getSsrcTransactionForAll(deviceId, channelId, null, stream);
if (ssrcTransactionList == null || ssrcTransactionList.isEmpty()) {
return;
}
for (SsrcTransaction ssrcTransaction : ssrcTransactionList) {
redisTemplate.delete(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_"
+ deviceId + "_" + channelId + "_" + ssrcTransaction.getCallId() + "_" + ssrcTransaction.getStream());
}
}
public void removeByCallId(String deviceId, String channelId, String callId) {
SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callId, null);
if (ssrcTransaction == null ) {
return;
}
redisTemplate.delete(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_"

View File

@@ -129,4 +129,6 @@ public class SipRunner implements CommandLineRunner {
}
}
}
}

View File

@@ -164,6 +164,7 @@ public class SIPRequestHeaderProvider {
Request request = null;
//请求行
SipURI requestLine = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, device.getHostAddress());
// SipURI requestLine = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), SipUtils.getNewViaTag());
@@ -174,6 +175,7 @@ public class SIPRequestHeaderProvider {
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, transactionInfo.getFromTag());
//to
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId,device.getHostAddress());
// SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(),device.getHostAddress());
Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, transactionInfo.getToTag());

View File

@@ -40,6 +40,8 @@ import javax.sip.SipFactory;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
/**
* @description:设备能力接口,用于定义设备的控制、查询能力
@@ -677,22 +679,21 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
SsrcTransaction ssrcTransaction;
if (callId != null) {
ssrcTransaction = streamSession.getSsrcTransaction(null, null, callId, null);
}else {
ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, null, stream);
}
if (ssrcTransaction == null) {
List<SsrcTransaction> ssrcTransactionList = streamSession.getSsrcTransactionForAll(device.getDeviceId(), channelId, callId, stream);
if (ssrcTransactionList == null || ssrcTransactionList.isEmpty()) {
logger.info("[发送BYE] 未找到事务信息,设备: device: {}, channel: {}", device.getDeviceId(), channelId);
throw new SsrcTransactionNotFoundException(device.getDeviceId(), channelId, callId, stream);
}
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
for (SsrcTransaction ssrcTransaction : ssrcTransactionList) {
logger.info("[发送BYE] 设备: device: {}, channel: {}, callId: {}", device.getDeviceId(), channelId, ssrcTransaction.getCallId());
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
Request byteRequest = headerProvider.createByteRequest(device, channelId, ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), byteRequest, null, okEvent);
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.removeByCallId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getCallId());
Request byteRequest = headerProvider.createByteRequest(device, channelId, ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), byteRequest, null, okEvent);
}
}
@Override

View File

@@ -579,7 +579,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public void sendNotifyForCatalogAddOrUpdate(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels, SubscribeInfo subscribeInfo, Integer index) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException {
if (parentPlatform == null || deviceChannels == null || deviceChannels.size() == 0 || subscribeInfo == null) {
if (parentPlatform == null || deviceChannels == null || deviceChannels.isEmpty() || subscribeInfo == null) {
return;
}
if (index == null) {
@@ -597,6 +597,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
Integer finalIndex = index;
String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
deviceChannels.size(), type, subscribeInfo);
logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size());
sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg);
}, (eventResult -> {
@@ -620,7 +621,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
SIPRequest notifyRequest = headerProviderPlatformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, subscribeInfo);
sipSender.transmitRequest(parentPlatform.getDeviceIp(), notifyRequest);
sipSender.transmitRequest(parentPlatform.getDeviceIp(), notifyRequest, errorEvent, okEvent);
}
private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
@@ -632,9 +633,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
.append("<CmdType>Catalog</CmdType>\r\n")
.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n")
.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n")
.append("<SumNum>1</SumNum>\r\n")
.append("<SumNum>"+ sumNum +"</SumNum>\r\n")
.append("<DeviceList Num=\"" + channels.size() + "\">\r\n");
if (channels.size() > 0) {
if (!channels.isEmpty()) {
for (DeviceChannel channel : channels) {
if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
channel.setParentId(parentPlatform.getDeviceGBId());

View File

@@ -33,6 +33,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@@ -167,14 +168,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
}
// 发流端发送的停止
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
if (ssrcTransaction == null ) {
logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求");
logger.info(request.toString());
return;
}
// 可能是设备发送的停止
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
if (ssrcTransaction == null) {
return;
}
logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
ParentPlatform platform = platformService.queryPlatformByServerGBId(ssrcTransaction.getDeviceId());
if (platform != null ) {
@@ -216,7 +215,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
}
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
streamSession.removeByCallId(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getCallId());
if (ssrcTransaction.getType() == InviteSessionType.BROADCAST) {
// 查找来源的对讲设备,发送停止
Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());

View File

@@ -152,7 +152,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
String requesterId = SipUtils.getUserIdFromFromHeader(request);
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
if (requesterId == null || channelId == null) {
logger.info("无法从FromHeader的Address中获取到平台id返回400");
logger.info("无法从请求中获取到平台id返回400");
// 参数不全, 发400请求错误
try {
responseAck(request, Response.BAD_REQUEST);
@@ -745,13 +745,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
try {
redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
} catch (SipException e) {
logger.error("未处理的异常 ", e);
} catch (InvalidArgumentException e) {
logger.error("未处理的异常 ", e);
} catch (ParseException e) {
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("未处理的异常 ", e);
}
}, userSetting.getPlatformPlayTimeout());
@@ -762,6 +759,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 添加在本机上线的通知
mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
dynamicTask.stop(callIdHeader.getCallId());
redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
if (serverId.equals(userSetting.getServerId())) {
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());

View File

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
@@ -185,6 +186,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 判断此通道是否存在
DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
if (deviceChannel != null) {
logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
channel.setId(deviceChannel.getId());
updateChannelMap.put(channel.getChannelId(), channel);
if (updateChannelMap.keySet().size() > 300) {
@@ -222,6 +224,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
if (deviceChannelForUpdate != null) {
channel.setId(deviceChannelForUpdate.getId());
channel.setUpdateTime(DateUtil.getNow());
updateChannelMap.put(channel.getChannelId(), channel);
if (updateChannelMap.keySet().size() > 300) {
executeSaveForUpdate();
@@ -244,11 +247,11 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 转发变化信息
eventPublisher.catalogEventPublish(null, channel, event);
if (updateChannelMap.keySet().size() > 0
|| addChannelMap.keySet().size() > 0
|| updateChannelOnlineList.size() > 0
|| updateChannelOfflineList.size() > 0
|| deleteChannelList.size() > 0) {
if (!updateChannelMap.keySet().isEmpty()
|| !addChannelMap.keySet().isEmpty()
|| !updateChannelOnlineList.isEmpty()
|| !updateChannelOfflineList.isEmpty()
|| !deleteChannelList.isEmpty()) {
if (!dynamicTask.contains(talkKey)) {
dynamicTask.startDelay(talkKey, this::executeSave, 1000);
@@ -262,16 +265,36 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
private void executeSave(){
executeSaveForAdd();
executeSaveForUpdate();
executeSaveForDelete();
executeSaveForOnline();
executeSaveForOffline();
try {
executeSaveForAdd();
} catch (Exception e) {
logger.error("[存储收到的增加通道] 异常: ", e );
}
try {
executeSaveForUpdate();
} catch (Exception e) {
logger.error("[存储收到的更新通道] 异常: ", e );
}
try {
executeSaveForDelete();
} catch (Exception e) {
logger.error("[存储收到的删除通道] 异常: ", e );
}
try {
executeSaveForOnline();
} catch (Exception e) {
logger.error("[存储收到的通道上线] 异常: ", e );
}
try {
executeSaveForOffline();
} catch (Exception e) {
logger.error("[存储收到的通道离线] 异常: ", e );
}
dynamicTask.stop(talkKey);
}
private void executeSaveForUpdate(){
if (updateChannelMap.values().size() > 0) {
if (!updateChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
updateChannelMap.clear();
deviceChannelService.batchUpdateChannel(deviceChannels);
@@ -280,7 +303,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
private void executeSaveForAdd(){
if (addChannelMap.values().size() > 0) {
if (!addChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
addChannelMap.clear();
deviceChannelService.batchAddChannel(deviceChannels);
@@ -288,21 +311,21 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
private void executeSaveForDelete(){
if (deleteChannelList.size() > 0) {
if (!deleteChannelList.isEmpty()) {
deviceChannelService.deleteChannels(deleteChannelList);
deleteChannelList.clear();
}
}
private void executeSaveForOnline(){
if (updateChannelOnlineList.size() > 0) {
if (!updateChannelOnlineList.isEmpty()) {
deviceChannelService.channelsOnline(updateChannelOnlineList);
updateChannelOnlineList.clear();
}
}
private void executeSaveForOffline(){
if (updateChannelOfflineList.size() > 0) {
if (!updateChannelOfflineList.isEmpty()) {
deviceChannelService.channelsOffline(updateChannelOfflineList);
updateChannelOfflineList.clear();
}

View File

@@ -76,7 +76,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
logger.info("[心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort());
logger.info("[收到心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort());
device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort())));
device.setIp(remoteAddressInfo.getIp());

View File

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.dom4j.Attribute;
import org.dom4j.Document;
@@ -214,8 +215,11 @@ public class XmlUtil {
return deviceChannel;
}
Element nameElement = itemDevice.element("Name");
if (nameElement != null) {
// 当通道名称为空时,设置通道名称为通道编码,避免级联时因通道名称为空导致上级接收通道失败
if (nameElement != null && StringUtils.isNotBlank(nameElement.getText())) {
deviceChannel.setName(nameElement.getText());
} else {
deviceChannel.setName(channelId);
}
if(channelId.length() <= 8) {
deviceChannel.setHasAudio(false);