优化点播, 级联点播级联录像。级联列表显示订阅状态

This commit is contained in:
648540858
2022-03-14 18:24:30 +08:00
parent 1171cf1ea9
commit 354a39961a
36 changed files with 694 additions and 410 deletions

View File

@@ -0,0 +1,5 @@
package com.genersoft.iot.vmp.gb28181.bean;
public interface InviteStreamCallback {
void call(InviteStreamInfo inviteStreamInfo);
}

View File

@@ -0,0 +1,61 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
public class InviteStreamInfo {
public InviteStreamInfo(MediaServerItem mediaServerItem, JSONObject response, String callId, String app, String stream) {
this.mediaServerItem = mediaServerItem;
this.response = response;
this.callId = callId;
this.app = app;
this.stream = stream;
}
private MediaServerItem mediaServerItem;
private JSONObject response;
private String callId;
private String app;
private String stream;
public MediaServerItem getMediaServerItem() {
return mediaServerItem;
}
public void setMediaServerItem(MediaServerItem mediaServerItem) {
this.mediaServerItem = mediaServerItem;
}
public JSONObject getResponse() {
return response;
}
public void setResponse(JSONObject response) {
this.response = response;
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
}

View File

@@ -114,6 +114,21 @@ public class ParentPlatform {
*/
private String catalogId;
/**
* 已被订阅目录信息
*/
private boolean catalogSubscribe;
/**
* 已被订阅报警信息
*/
private boolean alarmSubscribe;
/**
* 已被订阅GPS信息
*/
private boolean gpsSubscribe;
public Integer getId() {
return id;
}
@@ -290,4 +305,28 @@ public class ParentPlatform {
public void setCatalogId(String catalogId) {
this.catalogId = catalogId;
}
public boolean isCatalogSubscribe() {
return catalogSubscribe;
}
public void setCatalogSubscribe(boolean catalogSubscribe) {
this.catalogSubscribe = catalogSubscribe;
}
public boolean isAlarmSubscribe() {
return alarmSubscribe;
}
public void setAlarmSubscribe(boolean alarmSubscribe) {
this.alarmSubscribe = alarmSubscribe;
}
public boolean isGpsSubscribe() {
return gpsSubscribe;
}
public void setGpsSubscribe(boolean gpsSubscribe) {
this.gpsSubscribe = gpsSubscribe;
}
}

View File

@@ -21,6 +21,8 @@ public class SubscribeInfo {
this.eventType = eventHeader.getEventType();
this.transaction = evt.getServerTransaction();
this.dialog = evt.getDialog();
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
this.callId = callIdHeader.getCallId();
}
private String id;

View File

@@ -49,10 +49,10 @@ public class SipSubscribe {
errorTimeSubscribes.remove(key);
}
}
logger.info("okTimeSubscribes.size:{}",okTimeSubscribes.size());
logger.info("okSubscribes.size:{}",okSubscribes.size());
logger.info("errorTimeSubscribes.size:{}",errorTimeSubscribes.size());
logger.info("errorSubscribes.size:{}",errorSubscribes.size());
logger.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size());
logger.debug("okSubscribes.size:{}",okSubscribes.size());
logger.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size());
logger.debug("errorSubscribes.size:{}",errorSubscribes.size());
}
public interface Event {

View File

@@ -117,8 +117,6 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
if (parentPlatforms != null && parentPlatforms.size() > 0) {
for (ParentPlatform platform : parentPlatforms) {
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (subscribeInfo == null) continue;
logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);

View File

@@ -95,14 +95,14 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
logger.debug("\n收到响应\n{}", responseEvent.getResponse());
int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == 401) { // Success!
if (((status >= 200) && (status < 300)) || status == Response.UNAUTHORIZED) { // Success!
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
if (sipRequestProcessor != null) {
sipRequestProcessor.process(responseEvent);
}
if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -103,7 +104,7 @@ public interface ISIPCommander {
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,InviteStreamCallback inviteStreamCallback, InviteStreamCallback event, SipSubscribe.Event errorEvent);
/**
* 请求历史媒体下载
@@ -114,13 +115,13 @@ public interface ISIPCommander {
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
* @param downloadSpeed 下载倍速参数
*/
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event, SipSubscribe.Event errorEvent);
/**
* 视频流停止
*/
void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent);
void streamByeCmd(String deviceId, String channelId, String stream);
void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent);
void streamByeCmd(String deviceId, String channelId, String stream, String callId);
/**
* 回放暂停

View File

@@ -6,6 +6,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@@ -445,27 +447,13 @@ public class SIPCommander implements ISIPCommander {
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
@Override
public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event
, SipSubscribe.Event errorEvent) {
public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent) {
try {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (event != null) {
event.response(mediaServerItemInUse, json);
}
});
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
@@ -530,6 +518,21 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (hookEvent != null) {
InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
hookEvent.call(inviteStreamInfo);
}
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
transmitRequest(device, request, errorEvent, okEvent -> {
@@ -537,6 +540,9 @@ public class SIPCommander implements ISIPCommander {
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction());
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog);
});
if (inviteStreamCallback != null) {
inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
}
} catch ( SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
@@ -552,24 +558,11 @@ public class SIPCommander implements ISIPCommander {
* @param downloadSpeed 下载倍速参数
*/
@Override
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event
, SipSubscribe.Event errorEvent) {
try {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
event.response(mediaServerItemInUse, json);
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
});
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
@@ -637,6 +630,19 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
event.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
ClientTransaction transaction = transmitRequest(device, request, errorEvent);
@@ -652,15 +658,15 @@ public class SIPCommander implements ISIPCommander {
* 视频流停止, 不使用回调
*/
@Override
public void streamByeCmd(String deviceId, String channelId, String stream) {
streamByeCmd(deviceId, channelId, stream, null);
public void streamByeCmd(String deviceId, String channelId, String stream, String callId) {
streamByeCmd(deviceId, channelId, stream, callId, null);
}
/**
* 视频流停止
*/
@Override
public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) {
public void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent) {
try {
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream);
@@ -672,7 +678,15 @@ public class SIPCommander implements ISIPCommander {
}
return;
}
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, stream);
SIPDialog dialog;
if (callId != null) {
dialog = streamSession.getDialogByCallId(deviceId, channelId, callId);
}else {
if (stream == null) return;
dialog = streamSession.getDialogByStream(deviceId, channelId, stream);
}
if (dialog == null) {
logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId);
return;

View File

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
@@ -77,11 +78,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
parentPlatform.setExpires("0");
if (parentPlatformCatch != null) {
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
}
parentPlatform.setExpires("0");
return register(parentPlatform, null, null, errorEvent, okEvent, false);
}
@@ -416,11 +417,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent )
throws NoSuchFieldException, IllegalAccessException, SipException, ParseException {
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312");
Dialog dialog = subscribeInfo.getDialog();
Request notifyRequest = dialog.createRequest(Request.NOTIFY);
if (dialog == null) return;
SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
notifyRequest.setContent(catalogXmlContent, contentTypeHeader);
SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory()
@@ -511,7 +514,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
@Override
public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels, SubscribeInfo subscribeInfo, Integer index) {
public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels,
SubscribeInfo subscribeInfo, Integer index) {
if (parentPlatform == null
|| deviceChannels == null
|| deviceChannels.size() == 0
@@ -579,24 +583,30 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
recordXml.append("<SN>" +recordInfo.getSn() + "</SN>\r\n");
recordXml.append("<DeviceID>" + recordInfo.getDeviceId() + "</DeviceID>\r\n");
recordXml.append("<SumNum>" + recordInfo.getSumNum() + "</SumNum>\r\n");
recordXml.append("<RecordList Num=\"" + recordInfo.getRecordList().size()+"\">\r\n");
for (RecordItem recordItem : recordInfo.getRecordList()) {
recordXml.append("<Item>\r\n");
if (deviceChannel != null) {
recordXml.append("<DeviceID>" + recordItem.getDeviceId() + "</DeviceID>\r\n");
recordXml.append("<Name>" + recordItem.getName() + "</Name>\r\n");
recordXml.append("<StartTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "</StartTime>\r\n");
recordXml.append("<EndTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "</EndTime>\r\n");
recordXml.append("<Secrecy>" + recordItem.getSecrecy() + "</Secrecy>\r\n");
recordXml.append("<Type>" + recordItem.getType() + "</Type>\r\n");
if (!StringUtils.isEmpty(recordItem.getFileSize())) {
recordXml.append("<FileSize>" + recordItem.getFileSize() + "</FileSize>\r\n");
}
if (!StringUtils.isEmpty(recordItem.getFilePath())) {
recordXml.append("<FilePath>" + recordItem.getFilePath() + "</FilePath>\r\n");
if (recordInfo.getRecordList() == null ) {
recordXml.append("<RecordList Num=\"0\">\r\n");
}else {
recordXml.append("<RecordList Num=\"" + recordInfo.getRecordList().size()+"\">\r\n");
if (recordInfo.getRecordList().size() > 0) {
for (RecordItem recordItem : recordInfo.getRecordList()) {
recordXml.append("<Item>\r\n");
if (deviceChannel != null) {
recordXml.append("<DeviceID>" + recordItem.getDeviceId() + "</DeviceID>\r\n");
recordXml.append("<Name>" + recordItem.getName() + "</Name>\r\n");
recordXml.append("<StartTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "</StartTime>\r\n");
recordXml.append("<EndTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "</EndTime>\r\n");
recordXml.append("<Secrecy>" + recordItem.getSecrecy() + "</Secrecy>\r\n");
recordXml.append("<Type>" + recordItem.getType() + "</Type>\r\n");
if (!StringUtils.isEmpty(recordItem.getFileSize())) {
recordXml.append("<FileSize>" + recordItem.getFileSize() + "</FileSize>\r\n");
}
if (!StringUtils.isEmpty(recordItem.getFilePath())) {
recordXml.append("<FilePath>" + recordItem.getFilePath() + "</FilePath>\r\n");
}
}
recordXml.append("</Item>\r\n");
}
}
recordXml.append("</Item>\r\n");
}
recordXml.append("</RecordList>\r\n");

View File

@@ -27,10 +27,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
/**
* SIP命令类型 ACK请求
@@ -84,44 +81,72 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
String deviceId = sendRtpItem.getDeviceId();
StreamInfo streamInfo = null;
if (sendRtpItem.isPlay()) {
streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
}else {
streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
}
if (streamInfo == null) {
streamInfo = new StreamInfo();
streamInfo.setApp(sendRtpItem.getApp());
streamInfo.setStream(sendRtpItem.getStreamId());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACK开始向上级推流 rtp/{}", sendRtpItem.getStreamId());
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",streamInfo.getApp());
param.put("stream",streamInfo.getStream());
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStreamId());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
param.put("is_udp", is_Udp);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
if (jsonObject.getInteger("code") != 0) {
logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream());
// 监听流上线
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", streamInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
});
}
param.put("src_port", sendRtpItem.getLocalPort());
zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// if (streamInfo == null) { // 流还没上来对方就回复ack
// logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId());
// // 监听流上线
// // 添加订阅
// JSONObject subscribeKey = new JSONObject();
// subscribeKey.put("app", "rtp");
// subscribeKey.put("stream", sendRtpItem.getStreamId());
// subscribeKey.put("regist", true);
// subscribeKey.put("schema", "rtmp");
// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
// (MediaServerItem mediaServerItemInUse, JSONObject json)->{
// Map<String, Object> param = new HashMap<>();
// param.put("vhost","__defaultVhost__");
// param.put("app",json.getString("app"));
// param.put("stream",json.getString("stream"));
// param.put("ssrc", sendRtpItem.getSsrc());
// param.put("dst_url",sendRtpItem.getIp());
// param.put("dst_port", sendRtpItem.getPort());
// param.put("is_udp", is_Udp);
// param.put("src_port", sendRtpItem.getLocalPort());
// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// });
// }else {
// Map<String, Object> param = new HashMap<>();
// param.put("vhost","__defaultVhost__");
// param.put("app",streamInfo.getApp());
// param.put("stream",streamInfo.getStream());
// param.put("ssrc", sendRtpItem.getSsrc());
// param.put("dst_url",sendRtpItem.getIp());
// param.put("dst_port", sendRtpItem.getPort());
// param.put("is_udp", is_Udp);
// param.put("src_port", sendRtpItem.getLocalPort());
//
// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// if (jsonObject.getInteger("code") != 0) {
// logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream());
// // 监听流上线
// // 添加订阅
// JSONObject subscribeKey = new JSONObject();
// subscribeKey.put("app", "rtp");
// subscribeKey.put("stream", streamInfo.getStream());
// subscribeKey.put("regist", true);
// subscribeKey.put("schema", "rtmp");
// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
// (MediaServerItem mediaServerItemInUse, JSONObject json)->{
// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// });
// }
// }
}
}
}

View File

@@ -93,14 +93,16 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("停止向上级推流:" + streamId);
logger.info("收到bye:停止向上级推流:" + streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) {
logger.info(streamId + "无其它观看者,通知设备停止推流");
cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId);
logger.info("收到bye: {}无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.isPlay()) {
cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null);
}
}
}
// 可能是设备主动停止

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -91,6 +92,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Autowired
private VideoStreamSessionManager sessionManager;
@Override
public void afterPropertiesSet() throws Exception {
@@ -233,6 +237,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
String username = sdp.getOrigin().getUsername();
String addressStr = sdp.getOrigin().getAddress();
logger.info("[上级点播]用户:{} 地址:{}:{} ssrc{}", username, addressStr, port, ssrc);
Device device = null;
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
@@ -266,13 +271,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setDialog(dialogByteArray);
byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
sendRtpItem.setTransaction(transactionByteArray);
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
Long finalStartTime = startTime;
Long finalStopTime = stopTime;
ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP) {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId());
String app = responseJSON.getString("app");
String stream = responseJSON.getString("stream");
logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP) {}/{}", app, stream);
// * 0 等待设备推流上来
// * 1 下级已经推流等待上级平台回复ack
// * 2 推流中
@@ -325,46 +331,66 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
e.printStackTrace();
}
});
sendRtpItem.setApp("rtp");
if ("Playback".equals(sessionName)) {
sendRtpItem.setPlay(false);
sendRtpItem.setStreamId(ssrc);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true);
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> {
if (result.getCode() != 0){
logger.warn("录像回放失败");
if (result.getEvent() != null) {
errorEvent.response(result.getEvent());
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, format.format(start),
format.format(end), null, result -> {
if (result.getCode() != 0){
logger.warn("录像回放失败");
if (result.getEvent() != null) {
errorEvent.response(result.getEvent());
}
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
try {
responseAck(evt, Response.REQUEST_TIMEOUT);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}else {
if (result.getMediaServerItem() != null) {
hookEvent.response(result.getMediaServerItem(), result.getResponse());
}
}
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
try {
responseAck(evt, Response.REQUEST_TIMEOUT);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}else {
if (result.getMediaServerItem() != null) {
hookEvent.response(result.getMediaServerItem(), result.getResponse());
}
}
});
});
}else {
sendRtpItem.setPlay(true);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
if (streamInfo == null) {
SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
if (playTransaction != null) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
if (!streamReady) {
playTransaction = null;
}
}
if (playTransaction == null) {
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true);
if (mediaServerItem.isRtpEnable()) {
sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
}else {
sendRtpItem.setStreamId(ssrcInfo.getStream());
}
sendRtpItem.setPlay(false);
playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
});
}, null);
}else {
sendRtpItem.setStreamId(streamInfo.getStream());
hookEvent.response(mediaServerItem, null);
sendRtpItem.setStreamId(playTransaction.getStream());
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
JSONObject jsonObject = new JSONObject();
jsonObject.put("app", sendRtpItem.getApp());
jsonObject.put("stream", sendRtpItem.getStreamId());
hookEvent.response(mediaServerItem, jsonObject);
}
}
}else if (gbStream != null) {

View File

@@ -18,6 +18,7 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
@@ -56,14 +57,15 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
} catch (ParseException e) {
e.printStackTrace();
}
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
String NotifyType =getText(rootElement, "NotifyType");
if (NotifyType.equals("121")){
logger.info("媒体播放完毕,通知关流");
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), "*");
if (streamInfo != null) {
redisCatchStorage.stopPlayback(streamInfo);
cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream());
}
String channelId =getText(rootElement, "DeviceID");
redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
cmder.streamByeCmd(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
// TODO 如果级联播放,需要给上级发送此通知
}
}

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
@@ -40,6 +41,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Autowired
private SubscribeHolder subscribeHolder;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -83,19 +87,19 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
// 注册/注销成功
logger.info(String.format("%s %s成功", platformGBId, action));
redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus("注册".equals(action));
redisCatchStorage.delPlatformCatchInfo(platformGBId);
// 取回Expires设置避免注销过程中被置为0
if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) {
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
String expires = parentPlatformTmp.getExpires();
parentPlatform.setExpires(expires);
parentPlatform.setId(parentPlatformTmp.getId());
redisCatchStorage.updatePlatformRegister(parentPlatform);
redisCatchStorage.updatePlatformKeepalive(parentPlatform);
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
}
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
parentPlatformTmp.setStatus("注册".equals(action));
redisCatchStorage.updatePlatformRegister(parentPlatformTmp);
redisCatchStorage.updatePlatformKeepalive(parentPlatformTmp);
parentPlatformCatch.setParentPlatform(parentPlatformTmp);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
if ("注销".equals(action)) {
subscribeHolder.removeCatalogSubscribe(platformGBId);
subscribeHolder.removeMobilePositionSubscribe(platformGBId);
}
}
}