From 819ec97d6653af3a2daab3e9e2914662a8086a8d Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Fri, 12 Apr 2024 16:05:29 +0800
Subject: [PATCH 01/50] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E5=85=A8=E9=87=8F?=
=?UTF-8?q?=E8=84=9A=E6=9C=AC=E5=A4=9A=E4=BD=99=E5=AD=97=E6=AE=B5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
数据库/2.7.0/初始化-mysql-2.7.0.sql | 1 -
数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql | 1 -
2 files changed, 2 deletions(-)
diff --git a/数据库/2.7.0/初始化-mysql-2.7.0.sql b/数据库/2.7.0/初始化-mysql-2.7.0.sql
index 2cd88e725..6d7864fab 100644
--- a/数据库/2.7.0/初始化-mysql-2.7.0.sql
+++ b/数据库/2.7.0/初始化-mysql-2.7.0.sql
@@ -31,7 +31,6 @@ create table wvp_device (
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
- switch_primary_sub_stream bool default false,
broadcast_push_after_ack bool default false,
constraint uk_device_device unique (device_id)
);
diff --git a/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql b/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql
index 5cda94572..c81ca31bf 100644
--- a/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql
+++ b/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql
@@ -31,7 +31,6 @@ create table wvp_device (
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
- switch_primary_sub_stream bool default false,
broadcast_push_after_ack bool default false,
constraint uk_device_device unique (device_id)
);
From 3cd3e97cd7e2bd56405aa991347b480704cee949 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Fri, 12 Apr 2024 16:06:21 +0800
Subject: [PATCH 02/50] =?UTF-8?q?=E5=90=88=E5=B9=B6=E5=88=86=E6=94=AF270?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
数据库/2.7.1/初始化-mysql-2.7.0.sql | 1 -
数据库/2.7.1/初始化-postgresql-kingbase-2.7.0.sql | 1 -
2 files changed, 2 deletions(-)
diff --git a/数据库/2.7.1/初始化-mysql-2.7.0.sql b/数据库/2.7.1/初始化-mysql-2.7.0.sql
index cf614d914..48ed8cbda 100644
--- a/数据库/2.7.1/初始化-mysql-2.7.0.sql
+++ b/数据库/2.7.1/初始化-mysql-2.7.0.sql
@@ -31,7 +31,6 @@ create table wvp_device (
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
- switch_primary_sub_stream bool default false,
broadcast_push_after_ack bool default false,
constraint uk_device_device unique (device_id)
);
diff --git a/数据库/2.7.1/初始化-postgresql-kingbase-2.7.0.sql b/数据库/2.7.1/初始化-postgresql-kingbase-2.7.0.sql
index 317e9e09f..78bafe209 100644
--- a/数据库/2.7.1/初始化-postgresql-kingbase-2.7.0.sql
+++ b/数据库/2.7.1/初始化-postgresql-kingbase-2.7.0.sql
@@ -31,7 +31,6 @@ create table wvp_device (
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
- switch_primary_sub_stream bool default false,
broadcast_push_after_ack bool default false,
constraint uk_device_device unique (device_id)
);
From bd0fafde817b60dee79b1e4dd091a5e34ac8625c Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Mon, 15 Apr 2024 15:51:10 +0800
Subject: [PATCH 03/50] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=9B=AE=E5=BD=95?=
=?UTF-8?q?=E8=AE=A2=E9=98=85=E5=8F=91=E9=80=81=E8=AE=A2=E9=98=85=E6=9C=AA?=
=?UTF-8?q?=E6=90=BA=E5=B8=A6=E7=BB=8F=E7=BA=AC=E5=BA=A6=E7=9A=84=E9=97=AE?=
=?UTF-8?q?=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../transmit/cmd/impl/SIPCommanderFroPlatform.java | 2 ++
数据库/2.7.0/更新-mysql-2.7.0.sql | 12 +++++++++++-
数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql | 12 +++++++++++-
3 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index 296465464..2082d39e6 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -665,6 +665,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
.append(" " + channel.getOwner()+ "\r\n")
.append("" + channel.getCivilCode() + "\r\n")
.append("
" + channel.getAddress() + "\r\n");
+ catalogXml.append("" + channel.getLongitude() + "\r\n");
+ catalogXml.append("" + channel.getLatitude() + "\r\n");
}
if (!"presence".equals(subscribeInfo.getEventType())) {
catalogXml.append("" + type + "\r\n");
diff --git a/数据库/2.7.0/更新-mysql-2.7.0.sql b/数据库/2.7.0/更新-mysql-2.7.0.sql
index c229fb1ed..b14a5c84f 100644
--- a/数据库/2.7.0/更新-mysql-2.7.0.sql
+++ b/数据库/2.7.0/更新-mysql-2.7.0.sql
@@ -4,5 +4,15 @@ alter table wvp_device_channel
alter table wvp_device
drop switch_primary_sub_stream;
+# 第一个补丁包
alter table wvp_platform
- add send_stream_ip character varying(50);
\ No newline at end of file
+ add send_stream_ip character varying(50);
+
+alter table wvp_device
+ change on_line on_line bool default false;
+
+alter table wvp_device
+ change id id serial primary key;
+
+alter table wvp_device
+ change ssrc_check ssrc_check bool default false;
\ No newline at end of file
diff --git a/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql b/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql
index c229fb1ed..b14a5c84f 100644
--- a/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql
+++ b/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql
@@ -4,5 +4,15 @@ alter table wvp_device_channel
alter table wvp_device
drop switch_primary_sub_stream;
+# 第一个补丁包
alter table wvp_platform
- add send_stream_ip character varying(50);
\ No newline at end of file
+ add send_stream_ip character varying(50);
+
+alter table wvp_device
+ change on_line on_line bool default false;
+
+alter table wvp_device
+ change id id serial primary key;
+
+alter table wvp_device
+ change ssrc_check ssrc_check bool default false;
\ No newline at end of file
From cdeb3acf7ce232e8f3d7f91a9474e31278e09d7f Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Mon, 15 Apr 2024 16:13:56 +0800
Subject: [PATCH 04/50] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E5=A4=9A=E4=BD=99?=
=?UTF-8?q?=E6=96=87=E4=BB=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java | 0
1 file changed, 0 insertions(+), 0 deletions(-)
delete mode 100644 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
deleted file mode 100644
index e69de29bb..000000000
From f9abfca003bc9515f1f6028657fa6347326a1402 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Mon, 15 Apr 2024 21:33:22 +0800
Subject: [PATCH 05/50] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../iot/vmp/common/VideoManagerConstants.java | 2 +
.../vmp/conf/redis/RedisMsgListenConfig.java | 12 +-
.../iot/vmp/gb28181/bean/SendRtpItem.java | 28 ++
.../request/impl/InviteRequestProcessor.java | 386 +++++++-------
.../vmp/media/zlm/dto/ChannelOnlineEvent.java | 4 +-
.../service/bean/MessageForPushChannel.java | 1 +
.../redisMsg/RedisGbPlayMsgListener.java | 474 ------------------
.../RedisPlatformStartSendRtpListener.java | 81 +++
...sPlatformWaitPushStreamOnlineListener.java | 81 +++
.../iot/vmp/storager/IRedisCatchStorage.java | 3 +
.../storager/impl/RedisCatchStorageImpl.java | 12 +
11 files changed, 395 insertions(+), 689 deletions(-)
delete mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
create mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
create mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index d19b8f051..af574b932 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -72,6 +72,8 @@ public class VideoManagerConstants {
public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
+ public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
+ public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
index c14ebcdd0..fb88f54a8 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
@@ -31,9 +31,6 @@ public class RedisMsgListenConfig {
@Autowired
private RedisStreamMsgListener redisStreamMsgListener;
- @Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
@Autowired
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
@@ -49,6 +46,12 @@ public class RedisMsgListenConfig {
@Autowired
private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
+ @Autowired
+ private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener;
+
+ @Autowired
+ private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
+
/**
* redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
@@ -65,12 +68,13 @@ public class RedisMsgListenConfig {
container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
- container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
+ container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM));
+ container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
return container;
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
index 30193d275..c0507df08 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -22,6 +22,11 @@ public class SendRtpItem {
*/
private String platformId;
+ /**
+ * 平台名称
+ */
+ private String platformName;
+
/**
* 对应设备id
*/
@@ -61,6 +66,11 @@ public class SendRtpItem {
*/
private boolean tcpActive;
+ /**
+ * 自己推流使用的IP
+ */
+ private String localIp;
+
/**
* 自己推流使用的端口
*/
@@ -306,6 +316,22 @@ public class SendRtpItem {
this.receiveStream = receiveStream;
}
+ public String getPlatformName() {
+ return platformName;
+ }
+
+ public void setPlatformName(String platformName) {
+ this.platformName = platformName;
+ }
+
+ public String getLocalIp() {
+ return localIp;
+ }
+
+ public void setLocalIp(String localIp) {
+ this.localIp = localIp;
+ }
+
@Override
public String toString() {
return "SendRtpItem{" +
@@ -313,6 +339,7 @@ public class SendRtpItem {
", port=" + port +
", ssrc='" + ssrc + '\'' +
", platformId='" + platformId + '\'' +
+ ", platformName='" + platformName + '\'' +
", deviceId='" + deviceId + '\'' +
", app='" + app + '\'' +
", channelId='" + channelId + '\'' +
@@ -320,6 +347,7 @@ public class SendRtpItem {
", stream='" + stream + '\'' +
", tcp=" + tcp +
", tcpActive=" + tcpActive +
+ ", localIp=" + localIp +
", localPort=" + localPort +
", mediaServerId='" + mediaServerId + '\'' +
", serverId='" + serverId + '\'' +
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 96b8b11e5..353992274 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -28,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -127,13 +127,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private SipConfig config;
-
- @Autowired
- private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
@Autowired
private VideoStreamSessionManager streamSession;
+ @Autowired
+ private SendRtpPortManager sendRtpPortManager;
+
@Override
public void afterPropertiesSet() throws Exception {
@@ -577,21 +576,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}else {
ssrc = gb28181Sdp.getSsrc();
}
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setTcpActive(tcpActive);
+ sendRtpItem.setTcp(mediaTransmissionTCP);
+ sendRtpItem.setRtcp(platform.isRtcp());
+ sendRtpItem.setSsrc(ssrc);
+ sendRtpItem.setPlatformName(platform.getName());
+ sendRtpItem.setPlatformId(platform.getServerGBId());
+ sendRtpItem.setMediaServerId(mediaServerItem.getId());
+ sendRtpItem.setChannelId(channelId);
+ sendRtpItem.setIp(addressStr);
+ sendRtpItem.setPort(port);
+ sendRtpItem.setUsePs(true);
+ sendRtpItem.setApp(gbStream.getApp());
+ sendRtpItem.setStream(gbStream.getStream());
+ sendRtpItem.setCallId(callIdHeader.getCallId());
+ sendRtpItem.setFromTag(request.getFromTag());
+ sendRtpItem.setOnlyAudio(false);
+ sendRtpItem.setPlayType(InviteStreamType.PUSH);
+ sendRtpItem.setStatus(0);
if ("push".equals(gbStream.getStreamType())) {
if (streamPushItem != null) {
// 从redis查询是否正在接收这个推流
OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
+
+ sendRtpItem.setServerId(pushListItem.getSeverId());
if (pushListItem != null) {
StreamPushItem transform = streamPushService.transform(pushListItem);
transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
// 推流状态
- pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ pushStream(sendRtpItem, mediaServerItem, platform, request);
}else {
// 未推流 拉起
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
}
} else if ("proxy".equals(gbStream.getStreamType())) {
@@ -601,8 +619,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
//开启代理拉流
- notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
}
@@ -659,8 +676,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setStatus(1);
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setFromTag(request.getFromTag());
+ sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
@@ -670,19 +688,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
- private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
+ private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
// 推流
- if (streamPushItem.isSelf()) {
- Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
+ Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
- gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
- if (sendRtpItem == null) {
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
logger.warn("服务器端口资源不足");
try {
responseAck(request, Response.BUSY_HERE);
@@ -691,16 +704,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
return;
}
- if (tcpActive != null) {
- sendRtpItem.setTcpActive(tcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
// 写入redis, 超时时回复
sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
-
sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+ sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
@@ -708,210 +716,168 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} else {
// 不在线 拉起
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
-
} else {
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setRtcp(platform.isRtcp());
+ sendRtpItem.setTcp(mediaTransmissionTCP);
+ sendRtpItem.setTcpActive();
// 其他平台内容
- otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ otherWvpPushStream(sendRtpItem, request, platform);
}
}
/**
* 通知流上线
*/
- private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- if ("proxy".equals(gbStream.getStreamType())) {
- // TODO 控制启用以使设备上线
- logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
- // 监听流上线
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
- zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
- OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
- logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
- dynamicTask.stop(callIdHeader.getCallId());
- pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- });
- dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
- logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
- zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
- }, userSetting.getPlatformPlayTimeout());
- boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
- if (!start) {
- try {
- responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
- }
- zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
- dynamicTask.stop(callIdHeader.getCallId());
+ private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ // TODO 控制启用以使设备上线
+ logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
+ // 监听流上线
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
+ zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
+ OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
+ logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
+ dynamicTask.stop(callIdHeader.getCallId());
+ pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
+ mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+ });
+ dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
+ logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
+ zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+ }, userSetting.getPlatformPlayTimeout());
+ boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
+ if (!start) {
+ try {
+ responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
- } else if ("push".equals(gbStream.getStreamType())) {
- if (!platform.isStartOfflinePush()) {
- // 平台设置中关闭了拉起离线的推流则直接回复
- try {
- logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
- responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
- }
- return;
- }
- // 发送redis消息以使设备上线
- logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
-
- MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
- gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
- platform.getName(), null, gbStream.getMediaServerId());
- redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
- // 设置超时
- dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
- logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
- try {
- redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
- mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
- responseAck(request, Response.REQUEST_TIMEOUT); // 超时
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("未处理的异常 ", e);
- }
- }, userSetting.getPlatformPlayTimeout());
- // 添加监听
- int finalPort = port;
- Boolean finalTcpActive = tcpActive;
-
- // 添加在本机上线的通知
- mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
- dynamicTask.stop(callIdHeader.getCallId());
- redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
- if (serverId.equals(userSetting.getServerId())) {
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
- app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
-
- if (sendRtpItem == null) {
- logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
- try {
- responseAck(request, Response.BUSY_HERE);
- } catch (SipException e) {
- logger.error("未处理的异常 ", e);
- } catch (InvalidArgumentException e) {
- logger.error("未处理的异常 ", e);
- } catch (ParseException e) {
- logger.error("未处理的异常 ", e);
- }
- return;
- }
- if (finalTcpActive != null) {
- sendRtpItem.setTcpActive(finalTcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
- // 写入redis, 超时时回复
- sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
-
- sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
- if (response != null) {
- sendRtpItem.setToTag(response.getToTag());
- }
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
- } else {
- // 其他平台内容
- otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- }
- });
-
- // 添加回复的拒绝或者错误的通知
- redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
- if (response.getCode() != 0) {
- dynamicTask.stop(callIdHeader.getCallId());
- mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
- try {
- responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
- }
- }
- });
+ zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
+ dynamicTask.stop(callIdHeader.getCallId());
}
}
/**
- * 来自其他wvp的推流
+ * 通知流上线
*/
- private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
- String channelId, String addressStr, String ssrc, String requesterId) {
- logger.info("[级联点播]直播流来自其他平台,发送redis消息");
- // 发送redis消息
- redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
- streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
- channelId, mediaTransmissionTCP, platform.isRtcp(),platform.getName(), responseSendItemMsg -> {
- SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
- if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
- logger.warn("服务器端口资源不足");
- try {
- responseAck(request, Response.BUSY_HERE);
- } catch (SipException e) {
- logger.error("未处理的异常 ", e);
- } catch (InvalidArgumentException e) {
- logger.error("未处理的异常 ", e);
- } catch (ParseException e) {
- logger.error("未处理的异常 ", e);
- }
- return;
- }
- // 收到sendItem
- if (tcpActive != null) {
- sendRtpItem.setTcpActive(tcpActive);
- }
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
- // 写入redis, 超时时回复
- sendRtpItem.setStatus(1);
- sendRtpItem.setCallId(callIdHeader.getCallId());
+ private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+ if (!platform.isStartOfflinePush()) {
+ // 平台设置中关闭了拉起离线的推流则直接回复
+ try {
+ logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
+ responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
+ }
+ return;
+ }
+ // 发送redis消息以使设备上线
+ logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
- sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
- if (response != null) {
- sendRtpItem.setToTag(response.getToTag());
- }
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
- }, (wvpResult) -> {
+ MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
+ gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
+ platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
+ redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
+ // 设置超时
+ dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
+ logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
+ try {
+ redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
+ mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+ responseAck(request, Response.REQUEST_TIMEOUT); // 超时
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("未处理的异常 ", e);
+ }
+ }, userSetting.getPlatformPlayTimeout());
+ // 写入redis待发流信息,供其他wvp读取并生成发流信息
+ SendRtpItem sendRtpItemTemp = new SendRtpItem();
+ sendRtpItemTemp.setIp(addressStr);
+ sendRtpItemTemp.setPort(port);
+ sendRtpItemTemp.setSsrc(ssrc);
+ sendRtpItemTemp.setPlatformId(requesterId);
+ sendRtpItemTemp.setPlatformName(platform.getName());
+ sendRtpItemTemp.setTcp(mediaTransmissionTCP);
+ sendRtpItemTemp.setRtcp(platform.isRtcp());
+ sendRtpItemTemp.setTcpActive(tcpActive);
+ sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
+ redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
+ // 添加上线的通知
+ mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
+ dynamicTask.stop(callIdHeader.getCallId());
+ redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
+ if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
- // 错误
- if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
- // 离线
- // 查询是否在本机上线了
- StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
- if (currentStreamPushItem.isPushIng()) {
- // 在线状态
- pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
-
- } else {
- // 不在线 拉起
- notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
- mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
- }
- }
+ int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+ if (localPort == 0) {
+ logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足");
try {
responseAck(request, Response.BUSY_HERE);
- } catch (InvalidArgumentException | ParseException | SipException e) {
- logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage());
+ } catch (SipException e) {
+ logger.error("未处理的异常 ", e);
+ } catch (InvalidArgumentException e) {
+ logger.error("未处理的异常 ", e);
+ } catch (ParseException e) {
+ logger.error("未处理的异常 ", e);
}
- });
+ return;
+ }
+ sendRtpItemTemp.setLocalPort(localPort);
+ sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
+ // 写入redis, 超时时回复
+ sendRtpItemTemp.setStatus(1);
+ sendRtpItemTemp.setCallId(callIdHeader.getCallId());
+
+ sendRtpItemTemp.setFromTag(request.getFromTag());
+ SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
+ if (response != null) {
+ sendRtpItemTemp.setToTag(response.getToTag());
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
+ } else {
+ // 其他平台内容
+ otherWvpPushStream(sendRtpItemFromRedis, request, platform);
+ }
+ });
+
+ // 添加回复的拒绝或者错误的通知
+ redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+ if (response.getCode() != 0) {
+ dynamicTask.stop(callIdHeader.getCallId());
+ mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+ try {
+ responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
+ }
+ }
+ });
}
- public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
- String sdpIp = mediaServerItem.getSdpIp();
+
+ /**
+ * 来自其他wvp的推流
+ */
+ private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
+ logger.info("[级联点播]直播流来自其他平台,发送redis消息");
+ // 发送redis消息
+ redisCatchStorage.sendStartSendRtp(sendRtpItem);
+ // 写入redis, 超时时回复
+ sendRtpItem.setStatus(1);
+ sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
+ sendRtpItem.setFromTag(request.getFromTag());
+ SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
+ if (response != null) {
+ sendRtpItem.setToTag(response.getToTag());
+ }
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ }
+
+ public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform) {
+
+ String sdpIp = sendRtpItem.getLocalIp();
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
sdpIp = platform.getSendStreamIp();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
index 714838edb..6b3c94f8c 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.dto;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
import java.text.ParseException;
/**
@@ -7,5 +9,5 @@ import java.text.ParseException;
*/
public interface ChannelOnlineEvent {
- void run(String app, String stream, String serverId) throws ParseException;
+ void run(SendRtpItem sendRtpItem) throws ParseException;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
index 1a9e3e5cf..6a4f866ca 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
@@ -61,6 +61,7 @@ public class MessageForPushChannel {
messageForPushChannel.setGbId(gbId);
messageForPushChannel.setApp(app);
messageForPushChannel.setStream(stream);
+ messageForPushChannel.setServerId(serverId);
messageForPushChannel.setMediaServerId(mediaServerId);
messageForPushChannel.setPlatFormId(platFormId);
messageForPushChannel.setPlatFormName(platFormName);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
deleted file mode 100755
index 3b990f006..000000000
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
+++ /dev/null
@@ -1,474 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 监听下级发送推送信息,并发送国标推流消息上级
- * @author lin
- */
-@Component
-public class RedisGbPlayMsgListener implements MessageListener {
-
- private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
-
- public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
-
- /**
- * 流媒体不存在的错误玛
- */
- public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
-
- /**
- * 离线的错误玛
- */
- public static final int ERROR_CODE_OFFLINE = -2;
-
- /**
- * 超时的错误玛
- */
- public static final int ERROR_CODE_TIMEOUT = -3;
-
- private Map callbacks = new ConcurrentHashMap<>();
- private Map callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
- private Map callbacksForError = new ConcurrentHashMap<>();
-
- @Autowired
- private UserSetting userSetting;
-
-
- @Autowired
- private RedisTemplate