@@ -9,21 +9,16 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher ;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager ;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform ;
import com.genersoft.iot.vmp.media.zlm.dto.HookType ;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem ;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo ;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.* ;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform ;
import com.genersoft.iot.vmp.media.zlm.dto.* ;
import com.genersoft.iot.vmp.service.* ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage ;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult ;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
@@ -294,9 +289,9 @@ public class ZLMHttpHookListener {
JSONObject json = ( JSONObject ) JSON . toJSON ( param ) ;
taskExecutor . execute ( ( ) - > {
taskExecutor . execute ( ( ) - > {
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_stream_changed , json ) ;
if ( subscribe ! = null ) {
if ( subscribe ! = null ) {
MediaServerItem mediaInfo = mediaServerService . getOne ( param . getMediaServerId ( ) ) ;
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
@@ -312,15 +307,16 @@ public class ZLMHttpHookListener {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage . getStreamAuthorityInfo ( param . getApp ( ) , param . getStream ( ) ) ;
if ( streamAuthorityInfo = = null ) {
streamAuthorityInfo = StreamAuthorityInfo . getInstanceByHook ( param ) ;
} else {
} else {
streamAuthorityInfo . setOriginType ( param . getOriginType ( ) ) ;
streamAuthorityInfo . setOriginTypeStr ( param . getOriginTypeStr ( ) ) ;
}
redisCatchStorage . updateStreamAuthorityInfo ( param . getApp ( ) , param . getStream ( ) , streamAuthorityInfo ) ;
}
} else {
} else {
redisCatchStorage . removeStreamAuthorityInfo ( param . getApp ( ) , param . getStream ( ) ) ;
}
} ) ;
if ( " rtsp " . equals ( param . getSchema ( ) ) ) {
logger . info ( " on_stream_changed: 注册->{}, app->{}, stream->{} " , param . isRegist ( ) , param . getApp ( ) , param . getStream ( ) ) ;
@@ -329,12 +325,12 @@ public class ZLMHttpHookListener {
} else {
mediaServerService . removeCount ( param . getMediaServerId ( ) ) ;
}
if ( ite m. getOriginType ( ) = = OriginType . PULL . ordinal ( )
| | ite m. getOriginType ( ) = = OriginType . FFMPEG_PULL . ordinal ( ) ) {
if ( para m. getOriginType ( ) = = OriginType . PULL . ordinal ( )
| | para m. getOriginType ( ) = = OriginType . FFMPEG_PULL . ordinal ( ) ) {
// 设置拉流代理上线/离线
streamProxyService . updateStatus ( param . isRegist ( ) , app , param . getStream ( ) ) ;
streamProxyService . updateStatus ( param . isRegist ( ) , param . getApp ( ) , param . getStream ( ) ) ;
}
if ( " rtp " . equals ( app ) & & ! regist ) {
if ( " rtp " . equals ( param . getApp ( ) ) & & ! param . isRegist ( ) ) {
StreamInfo streamInfo = redisCatchStorage . queryPlayByStreamId ( param . getStream ( ) ) ;
if ( streamInfo ! = null ) {
redisCatchStorage . stopPlay ( streamInfo ) ;
@@ -346,47 +342,49 @@ public class ZLMHttpHookListener {
streamInfo . getStream ( ) , null ) ;
}
}
} else if ( " broadcast " . equals ( app ) ) {
} else if ( " broadcast " . equals ( param . getApp ( ) ) ) {
// 语音对讲推流 stream需要满足格式deviceId_channelId
if ( regist & & param . getStream ( ) . indexOf ( " _ " ) > 0 ) {
if ( param . isRegist ( ) & & param . getStream ( ) . indexOf ( " _ " ) > 0 ) {
String [ ] streamArray = param . getStream ( ) . split ( " _ " ) ;
if ( streamArray . length = = 2 ) {
String deviceId = streamArray [ 0 ] ;
String channelId = streamArray [ 1 ] ;
Device device = deviceService . query Device( deviceId ) ;
Device device = deviceService . get Device( deviceId ) ;
if ( device ! = null ) {
DeviceChannel deviceChannel = storager . queryChannel ( deviceId , channelId ) ;
if ( deviceChannel ! = null ) {
if ( audioBroadcastManager . exit ( deviceId , channelId ) ) {
// 直接推流
SendRtpItem sendRtpItem = redisCatchStorage . querySendRTPServer ( null , null , stream , null ) ;
SendRtpItem sendRtpItem = redisCatchStorage . querySendRTPServer ( null , null , param . getStream ( ) , null ) ;
if ( sendRtpItem = = null ) {
// TODO 可能数据错误,重新开启语音通道
} else {
String is_Udp = sendRtpItem . isTcp ( ) ? " 0 " : " 1 " ;
MediaServerItem mediaInfo = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
logger . info ( " rtp/{}开始向上级推流, 目标={}:{}, SSRC={} " , sendRtpItem . getStreamId ( ) , sendRtpItem . getIp ( ) , sendRtpItem . getPort ( ) , sendRtpItem . getSsrc ( ) ) ;
Map < String , Object > p aram = new HashMap < > ( 12 ) ;
p aram. put ( " vhost " , " __defaultVhost__ " ) ;
p aram. put ( " app " , sendRtpItem . getApp ( ) ) ;
p aram. put ( " stream " , sendRtpItem . getStreamId ( ) ) ;
p aram. put ( " ssrc " , sendRtpItem . getSsrc ( ) ) ;
p aram. put ( " src_port " , sendRtpItem . getLocalPort ( ) ) ;
p aram. put ( " pt " , sendRtpItem . getPt ( ) ) ;
p aram. put ( " use_ps " , sendRtpItem . isUsePs ( ) ? " 1 " : " 0 " ) ;
p aram. put ( " only_audio " , sendRtpItem . isOnlyAudio ( ) ? " 1 " : " 0 " ) ;
Map < String , Object > sendP aram = new HashMap < > ( 12 ) ;
sendP aram. put ( " vhost " , " __defaultVhost__ " ) ;
sendP aram. put ( " app " , sendRtpItem . getApp ( ) ) ;
sendP aram. put ( " stream " , sendRtpItem . getStreamId ( ) ) ;
sendP aram. put ( " ssrc " , sendRtpItem . getSsrc ( ) ) ;
sendP aram. put ( " src_port " , sendRtpItem . getLocalPort ( ) ) ;
sendP aram. put ( " pt " , sendRtpItem . getPt ( ) ) ;
sendP aram. put ( " use_ps " , sendRtpItem . isUsePs ( ) ? " 1 " : " 0 " ) ;
sendP aram. put ( " only_audio " , sendRtpItem . isOnlyAudio ( ) ? " 1 " : " 0 " ) ;
JSONObject jsonObject ;
if ( sendRtpItem . isTcpActive ( ) ) {
jsonObject = zlmrtpServerFactory . startSendRtpPassive ( mediaInfo , p aram) ;
jsonObject = zlmrtpServerFactory . startSendRtpPassive ( mediaInfo , sendP aram) ;
} else {
p aram. put ( " is_udp " , is_Udp ) ;
p aram. put ( " dst_url " , sendRtpItem . getIp ( ) ) ;
p aram. put ( " dst_port " , sendRtpItem . getPort ( ) ) ;
jsonObject = zlmrtpServerFactory . startSendRtpStream ( mediaInfo , p aram) ;
sendP aram. put ( " is_udp " , is_Udp ) ;
sendP aram. put ( " dst_url " , sendRtpItem . getIp ( ) ) ;
sendP aram. put ( " dst_port " , sendRtpItem . getPort ( ) ) ;
jsonObject = zlmrtpServerFactory . startSendRtpStream ( mediaInfo , sendP aram) ;
}
if ( jsonObject ! = null & & jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " [语音对讲] 自动推流成功, device: {}, channel: {} " , deviceId , channelId ) ;
} else {
logger . info ( " [语音对讲] 推流失败, 结果: {} " , jsonObject ) ;
}
}
@@ -406,43 +404,43 @@ public class ZLMHttpHookListener {
}
}
} else if ( " talk " . equals ( app ) ) {
} else if ( " talk " . equals ( param . getApp ( ) ) ) {
// 语音对讲推流 stream需要满足格式deviceId_channelId
if ( regist & & stream . indexOf ( " _ " ) > 0 ) {
String [ ] streamArray = stream . split ( " _ " ) ;
if ( param . isRegist ( ) & & param . getStream ( ) . indexOf ( " _ " ) > 0 ) {
String [ ] streamArray = param . getStream ( ) . split ( " _ " ) ;
if ( streamArray . length = = 2 ) {
String deviceId = streamArray [ 0 ] ;
String channelId = streamArray [ 1 ] ;
Device device = deviceService . query Device( deviceId ) ;
Device device = deviceService . get Device( deviceId ) ;
if ( device ! = null ) {
DeviceChannel deviceChannel = storager . queryChannel ( deviceId , channelId ) ;
if ( deviceChannel ! = null ) {
if ( audioBroadcastManager . exit ( deviceId , channelId ) ) {
// 直接推流
SendRtpItem sendRtpItem = redisCatchStorage . querySendRTPServer ( null , null , stream , null ) ;
SendRtpItem sendRtpItem = redisCatchStorage . querySendRTPServer ( null , null , param . getStream ( ) , null ) ;
if ( sendRtpItem = = null ) {
// TODO 可能数据错误,重新开启语音通道
} else {
MediaServerItem mediaInfo = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
logger . info ( " rtp/{}开始向上级推流, 目标={}:{}, SSRC={} " , sendRtpItem . getStreamId ( ) , sendRtpItem . getIp ( ) , sendRtpItem . getPort ( ) , sendRtpItem . getSsrc ( ) ) ;
Map < String , Object > p aram = new HashMap < > ( 12 ) ;
p aram. put ( " vhost " , " __defaultVhost__ " ) ;
p aram. put ( " app " , sendRtpItem . getApp ( ) ) ;
p aram. put ( " stream " , sendRtpItem . getStreamId ( ) ) ;
p aram. put ( " ssrc " , sendRtpItem . getSsrc ( ) ) ;
p aram. put ( " src_port " , sendRtpItem . getLocalPort ( ) ) ;
p aram. put ( " pt " , sendRtpItem . getPt ( ) ) ;
p aram. put ( " use_ps " , sendRtpItem . isUsePs ( ) ? " 1 " : " 0 " ) ;
p aram. put ( " only_audio " , sendRtpItem . isOnlyAudio ( ) ? " 1 " : " 0 " ) ;
Map < String , Object > sendP aram = new HashMap < > ( 12 ) ;
sendP aram. put ( " vhost " , " __defaultVhost__ " ) ;
sendP aram. put ( " app " , sendRtpItem . getApp ( ) ) ;
sendP aram. put ( " stream " , sendRtpItem . getStreamId ( ) ) ;
sendP aram. put ( " ssrc " , sendRtpItem . getSsrc ( ) ) ;
sendP aram. put ( " src_port " , sendRtpItem . getLocalPort ( ) ) ;
sendP aram. put ( " pt " , sendRtpItem . getPt ( ) ) ;
sendP aram. put ( " use_ps " , sendRtpItem . isUsePs ( ) ? " 1 " : " 0 " ) ;
sendP aram. put ( " only_audio " , sendRtpItem . isOnlyAudio ( ) ? " 1 " : " 0 " ) ;
JSONObject jsonObject ;
if ( sendRtpItem . isTcpActive ( ) ) {
jsonObject = zlmrtpServerFactory . startSendRtpPassive ( mediaInfo , p aram) ;
jsonObject = zlmrtpServerFactory . startSendRtpPassive ( mediaInfo , sendP aram) ;
} else {
p aram. put ( " is_udp " , sendRtpItem . isTcp ( ) ? " 0 " : " 1 " ) ;
p aram. put ( " dst_url " , sendRtpItem . getIp ( ) ) ;
p aram. put ( " dst_port " , sendRtpItem . getPort ( ) ) ;
jsonObject = zlmrtpServerFactory . startSendRtpStream ( mediaInfo , p aram) ;
sendP aram. put ( " is_udp " , sendRtpItem . isTcp ( ) ? " 0 " : " 1 " ) ;
sendP aram. put ( " dst_url " , sendRtpItem . getIp ( ) ) ;
sendP aram. put ( " dst_port " , sendRtpItem . getPort ( ) ) ;
jsonObject = zlmrtpServerFactory . startSendRtpStream ( mediaInfo , sendP aram) ;
}
if ( jsonObject ! = null & & jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " [语音对讲] 自动推流成功, device: {}, channel: {} " , deviceId , channelId ) ;
@@ -450,7 +448,7 @@ public class ZLMHttpHookListener {
}
} else {
// 开启语音对讲通道
MediaServerItem mediaServerItem = mediaServerService . getOne ( m ediaServerId) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( param . getM ediaServerId( ) );
playService . talk ( mediaServerItem , device , channelId , ( mediaServer , jsonObject ) - > {
System . out . println ( " 开始推流 " ) ;
} , eventResult - > {
@@ -466,9 +464,9 @@ public class ZLMHttpHookListener {
}
} else {
if ( ! " rtp " . equals ( app ) ) {
String type = OriginType . values ( ) [ ite m. getOriginType ( ) ] . getType ( ) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( m ediaServerId) ;
if ( ! " rtp " . equals ( param . getApp ( ) ) ) {
String type = OriginType . values ( ) [ para m. getOriginType ( ) ] . getType ( ) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( param . getM ediaServerId( ) );
if ( mediaServerItem ! = null ) {
if ( param . isRegist ( ) ) {
@@ -478,7 +476,7 @@ public class ZLMHttpHookListener {
callId = streamAuthorityInfo . getCallId ( ) ;
}
StreamInfo streamInfoByAppAndStream = mediaService . getStreamInfoByAppAndStream ( mediaServerItem ,
param . getApp ( ) , param . getStream ( ) , tracks , callId ) ;
param . getApp ( ) , param . getStream ( ) , param . getTracks ( ) , callId ) ;
param . setStreamInfo ( streamInfoByAppAndStream ) ;
redisCatchStorage . addStream ( mediaServerItem , type , param . getApp ( ) , param . getStream ( ) , param ) ;
if ( param . getOriginType ( ) = = OriginType . RTSP_PUSH . ordinal ( )
@@ -489,7 +487,8 @@ public class ZLMHttpHookListener {
}
} else {
// 兼容流注销时类型从redis记录获取
OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage . getStreamInfo ( param . getApp ( ) , param . getStream ( ) , param . getMediaServerId ( ) ) ;
OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage . getStreamInfo (
param . getApp ( ) , param . getStream ( ) , param . getMediaServerId ( ) ) ;
if ( onStreamChangedHookParam ! = null ) {
type = OriginType . values ( ) [ onStreamChangedHookParam . getOriginType ( ) ] . getType ( ) ;
redisCatchStorage . removeStream ( mediaServerItem . getId ( ) , type , param . getApp ( ) , param . getStream ( ) ) ;
@@ -526,13 +525,13 @@ public class ZLMHttpHookListener {
if ( platform ! = null ) {
commanderFroPlatform . streamByeCmd ( platform , sendRtpItem ) ;
} else {
if ( " talk " . equals ( app ) & & sendRtpItem . isOnlyAudio ( ) ) {
if ( " talk " . equals ( param . getApp ( ) ) & & sendRtpItem . isOnlyAudio ( ) ) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager . get ( sendRtpItem . getDeviceId ( ) , sendRtpItem . getChannelId ( ) ) ;
if ( device ! = null & & audioBroadcastCatch ! = null ) {
// cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
}
} else {
cmder . streamByeCmd ( device , sendRtpItem . getChannelId ( ) , stream , sendRtpItem . getCallId ( ) ) ;
cmder . streamByeCmd ( device , sendRtpItem . getChannelId ( ) , param . getStream ( ) , sendRtpItem . getCallId ( ) ) ;
}
}
@@ -575,6 +574,9 @@ public class ZLMHttpHookListener {
if ( sendRtpItems . size ( ) > 0 ) {
for ( SendRtpItem sendRtpItem : sendRtpItems ) {
ParentPlatform parentPlatform = storager . queryParentPlatByServerGBId ( sendRtpItem . getPlatformId ( ) ) ;
if ( parentPlatform = = null ) {
continue ;
}
try {
commanderFroPlatform . streamByeCmd ( parentPlatform , sendRtpItem . getCallId ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {