@@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform ;
import com.genersoft.iot.vmp.media.zlm.dto.* ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.* ;
import com.genersoft.iot.vmp.service.* ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage ;
@@ -108,17 +109,20 @@ public class ZLMHttpHookListener {
*/
@ResponseBody
@PostMapping ( value = " /on_server_keepalive " , produces = " application/json;charset=UTF-8 " )
public JSONObject onServerKeepalive ( @RequestBody JSONObject json ) {
public JSONObject onServerKeepalive ( @RequestBody OnServerKeepaliveHookParam param ) {
logger . info ( " [ ZLM HOOK ]on_server_keepalive API调用, 参数: " + json . toString ( ) ) ;
String mediaServerId = json . getString ( " mediaServerId " ) ;
List < ZlmHttpHookSubscribe . Event > subscribes = this . subscribe . getSubscribes ( HookType . on_server_keepalive ) ;
if ( subscribes ! = null & & subscribes . size ( ) > 0 ) {
for ( ZlmHttpHookSubscribe . Event subscribe : subscribes ) {
subscribe . response ( null , json ) ;
logger . info ( " [ZLM HOOK] 收到zlm心跳: " + param . getMediaServerId ( ) ) ;
taskExecutor . execute ( ( ) - > {
List < ZlmHttpHookSubscribe . Event > subscribes = this . subscribe. getSubscribes ( HookType . on_server_keepalive ) ;
JSONObject json = ( JSONObject ) JSON . toJSON ( param ) ;
if ( subscribes ! = null & & subscribes . size ( ) > 0 ) {
for ( ZlmHttpHookSubscribe . Event subscribe : subscribes ) {
subscribe . response ( null , json ) ;
}
}
}
mediaServerService . updateMediaServerKeepalive ( m ediaServerId, json . getJSONObject ( " d ata " ) ) ;
} ) ;
mediaServerService . updateMediaServerKeepalive ( param . getM ediaServerId( ) , param . getD ata ( ) ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
@@ -126,43 +130,6 @@ public class ZLMHttpHookListener {
return ret ;
}
/**
* 流量统计事件, 播放器或推流器断开时并且耗用流量超过特定阈值时会触发此事件, 阈值通过配置文件general.flowThreshold配置; 此事件对回复不敏感。
*
*/
@ResponseBody
@PostMapping ( value = " /on_flow_report " , produces = " application/json;charset=UTF-8 " )
public JSONObject onFlowReport ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_flow_report API调用, 参数: " + json . toString ( ) ) ;
}
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " msg " , " success " ) ;
return ret ;
}
/**
* 访问http文件服务器上hls之外的文件时触发。
*
*/
@ResponseBody
@PostMapping ( value = " /on_http_access " , produces = " application/json;charset=UTF-8 " )
public JSONObject onHttpAccess ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_http_access API 调用,参数: " + json . toString ( ) ) ;
}
String mediaServerId = json . getString ( " mediaServerId " ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " err " , " " ) ;
ret . put ( " path " , " " ) ;
ret . put ( " second " , 600 ) ;
return ret ;
}
/**
* 播放器鉴权事件, rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。
@@ -171,20 +138,21 @@ public class ZLMHttpHookListener {
@ResponseBody
@PostMapping ( value = " /on_play " , produces = " application/json;charset=UTF-8 " )
public JSONObject onPlay ( @RequestBody OnPlayHookParam param ) {
JSONObject json = ( JSONObject ) JSON . toJSON ( param ) ;
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_play API调用, 参数: " + JSON . toJSONString ( param ) ) ;
logger . debug ( " [ZLM HOOK] 播放鉴权:{}->{} " + param . getMediaServerId ( ) , param ) ;
}
String mediaServerId = param . getMediaServerId ( ) ;
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_play , json ) ;
if ( subscribe ! = null ) {
MediaServerItem mediaInfo = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
taskExecutor . execute ( ( ) - > {
JSONObject json = ( JSONObject ) JSON . toJSON ( param ) ;
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_play , json ) ;
if ( subscribe ! = null ) {
MediaServerItem mediaInfo = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
}
}
}
} ) ;
JSONObject ret = new JSONObject ( ) ;
if ( ! " rtp " . equals ( param . getApp ( ) ) ) {
Map < String , String > paramMap = urlParamToMap ( param . getParams ( ) ) ;
@@ -211,7 +179,7 @@ public class ZLMHttpHookListener {
JSONObject json = ( JSONObject ) JSON . toJSON ( param ) ;
logger . info ( " [ ZLM HOOK ]on_publish API调用, 参数: " + json . toString ( ) ) ;
logger . info ( " [ZLM HOOK]推流鉴权:{}->{} " , param . getMediaServerId ( ) , param ) ;
JSONObject ret = new JSONObject ( ) ;
String mediaServerId = json . getString ( " mediaServerId " ) ;
MediaServerItem mediaInfo = mediaServerService . getOne ( mediaServerId ) ;
@@ -258,21 +226,23 @@ public class ZLMHttpHookListener {
ret . put ( " code " , 0 ) ;
ret . put ( " msg " , " success " ) ;
ret . put ( " enable_hls " , tru e) ;
ret . put ( " enable_hls " , fals e) ;
if ( ! " rtp " . equals ( param . getApp ( ) ) ) {
ret . put ( " enable_audio " , true ) ;
}
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_publish , json ) ;
if ( subscribe ! = null ) {
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
} else {
ret . put ( " code " , 1 ) ;
ret . put ( " msg " , " zlm not register " ) ;
taskExecutor . execute ( ( ) - > {
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_publish , json ) ;
if ( subscribe ! = null ) {
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
} else {
ret . put ( " code " , 1 ) ;
ret . put ( " msg " , " zlm not register " ) ;
}
}
}
} ) ;
if ( " rtp " . equals ( param . getApp ( ) ) ) {
ret . put ( " enable_mp4 " , userSetting . getRecordSip ( ) ) ;
@@ -292,113 +262,10 @@ public class ZLMHttpHookListener {
ret . put ( " mp4_max_second " , 10 ) ;
ret . put ( " enable_mp4 " , true ) ;
ret . put ( " enable_audio " , true ) ;
}
}
return ret ;
}
/**
* 录制mp4完成后通知事件; 此事件对回复不敏感。
*
*/
@ResponseBody
@PostMapping ( value = " /on_record_mp4 " , produces = " application/json;charset=UTF-8 " )
public JSONObject onRecordMp4 ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_record_mp4 API调用, 参数: " + json . toString ( ) ) ;
}
String mediaServerId = json . getString ( " mediaServerId " ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " msg " , " success " ) ;
return ret ;
}
/**
* 录制hls完成后通知事件; 此事件对回复不敏感。
*
*/
@ResponseBody
@PostMapping ( value = " /on_record_ts " , produces = " application/json;charset=UTF-8 " )
public JSONObject onRecordTs ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_record_ts API调用, 参数: " + json . toString ( ) ) ;
}
String mediaServerId = json . getString ( " mediaServerId " ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " msg " , " success " ) ;
return ret ;
}
/**
* rtsp专用的鉴权事件, 先触发on_rtsp_realm事件然后才会触发on_rtsp_auth事件。
*
*/
@ResponseBody
@PostMapping ( value = " /on_rtsp_realm " , produces = " application/json;charset=UTF-8 " )
public JSONObject onRtspRealm ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_rtsp_realm API调用, 参数: " + json . toString ( ) ) ;
}
String mediaServerId = json . getString ( " mediaServerId " ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " realm " , " " ) ;
return ret ;
}
/**
* 该rtsp流是否开启rtsp专用方式的鉴权事件, 开启后才会触发on_rtsp_auth事件。需要指出的是rtsp也支持url参数鉴权, 它支持两种方式鉴权。
*
*/
@ResponseBody
@PostMapping ( value = " /on_rtsp_auth " , produces = " application/json;charset=UTF-8 " )
public JSONObject onRtspAuth ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_rtsp_auth API调用, 参数: " + json . toString ( ) ) ;
}
String mediaServerId = json . getString ( " mediaServerId " ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " encrypted " , false ) ;
ret . put ( " passwd " , " test " ) ;
return ret ;
}
/**
* shell登录鉴权, ZLMediaKit提供简单的telnet调试方式, 使用telnet 127.0.0.1 9000能进入MediaServer进程的shell界面。
*
*/
@ResponseBody
@PostMapping ( value = " /on_shell_login " , produces = " application/json;charset=UTF-8 " )
public JSONObject onShellLogin ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_shell_login API调用, 参数: " + json . toString ( ) ) ;
}
String mediaServerId = json . getString ( " mediaServerId " ) ;
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_shell_login , json ) ;
if ( subscribe ! = null ) {
MediaServerItem mediaInfo = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
}
}
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " msg " , " success " ) ;
return ret ;
}
/**
* rtsp/rtmp流注册或注销时触发此事件; 此事件对回复不敏感。
@@ -406,137 +273,139 @@ public class ZLMHttpHookListener {
*/
@ResponseBody
@PostMapping ( value = " /on_stream_changed " , produces = " application/json;charset=UTF-8 " )
public JSONObject onStreamChanged ( @RequestBody MediaIte m ite m) {
public JSONObject onStreamChanged ( @RequestBody OnStreamChangedHookPara m para m) {
logger . info ( " [ ZLM HOOK ]on_stream_changed API调用, 参数: " + JSONObject . toJSONString ( item ) ) ;
String mediaServerId = item . getMediaServerId ( ) ;
JSONObject json = ( JSONObject ) JSON . toJSON ( item ) ;
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_stream_changed , json ) ;
if ( subscribe ! = null ) {
MediaServerItem mediaInfo = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
}
}
// 流消失移除redis play
String app = item . getApp ( ) ;
String stream = item . getStream ( ) ;
String schema = item . getSchema ( ) ;
List < MediaItem . MediaTrack > tracks = item . getTracks ( ) ;
boolean regist = item . isRegist ( ) ;
if ( regist ) {
if ( item . getOriginType ( ) = = OriginType . RTMP_PUSH . ordinal ( )
| | item . getOriginType ( ) = = OriginType . RTSP_PUSH . ordinal ( )
| | item . getOriginType ( ) = = OriginType . RTC_PUSH . ordinal ( ) ) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage . getStreamAuthorityInfo ( app , stream ) ;
if ( streamAuthorityInfo = = null ) {
streamAuthorityInfo = StreamAuthorityInfo . getInstanceByHook ( item ) ;
} else {
streamAuthorityInfo . setOriginType ( item . getOriginType ( ) ) ;
streamAuthorityInfo . setOriginTypeStr ( item . getOriginTypeStr ( ) ) ;
}
redisCatchStorage . updateStreamAuthorityInfo ( app , stream , streamAuthorityInfo ) ;
}
if ( param . isRegist ( ) ) {
logger . info ( " [ZLM HOOK] 流注册, {}->{}->{}/{} " , param . getMediaServerId ( ) , param . getSchema ( ) , param . getApp ( ) , param . getStream ( ) ) ;
} else {
redisCatchStorage . removeStreamAuthorityInfo ( app , stream ) ;
logger . info ( " [ZLM HOOK] 流注销, {}->{}->{}/{} " , param . getMediaServerId ( ) , param . getSchema ( ) , param . getApp ( ) , param . getStream ( ) ) ;
}
if ( " rtsp " . equals ( schema ) ) {
logger . info ( " on_stream_changed: 注册->{}, app->{}, stream->{} " , regist , app , stream ) ;
if ( regist ) {
mediaServerService . addCount ( mediaServerId ) ;
} else {
mediaServerService . removeCount ( mediaServerId ) ;
}
if ( item . getOriginType ( ) = = OriginType . PULL . ordinal ( )
| | item . getOriginType ( ) = = OriginType . FFMPEG_PULL . ordinal ( ) ) {
// 设置拉流代理上线/离线
streamProxyService . updateStatus ( regist , app , stream ) ;
}
if ( " rtp " . equals ( app ) & & ! regist ) {
StreamInfo streamInfo = redisCatchStorage . queryPlayByStreamId ( stream ) ;
if ( streamInfo ! = null ) {
redisCatchStorage . stopPlay ( streamInfo ) ;
storager . stopPlay ( streamInfo . getDeviceID ( ) , streamInfo . getChannelId ( ) ) ;
} else {
streamInfo = redisCatchStorage . queryPlayback ( null , null , stream , null ) ;
if ( streamInfo ! = null ) {
redisCatchStorage . stopPlayback ( streamInfo . getDeviceID ( ) , streamInfo . getChannelId ( ) ,
streamInfo . getStream ( ) , null ) ;
}
}
} else {
if ( ! " rtp " . equals ( app ) ) {
String type = OriginType . values ( ) [ item . getOriginType ( ) ] . getType ( ) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServerItem ! = null ) {
if ( regist ) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage . getStreamAuthorityInfo ( app , stream ) ;
String callId = null ;
if ( streamAuthority Info ! = null ) {
callId = streamAuthorityInfo . getCallId ( ) ;
}
StreamInfo streamInfoByAppAndStream = mediaService . getStreamInfoByAppAndStream ( mediaServerItem ,
app , stream , tracks , callId ) ;
item . setStreamInfo ( streamInfoByAppAndStream ) ;
redisCatchStorage . addStream ( mediaServerItem , type , app , stream , item ) ;
if ( item . getOriginType ( ) = = OriginType . RTSP_PUSH . ordinal ( )
| | item . getOriginType ( ) = = OriginType . RTMP_PUSH . ordinal ( )
| | item . getOriginType ( ) = = OriginType . RTC_PUSH . ordinal ( ) ) {
item . setSeverId ( userSetting . getServerId ( ) ) ;
zlmMediaListManager . addPush ( item ) ;
}
} else {
// 兼容流注销时类型从redis记录获取
MediaItem mediaItem = redisCatchStorage . getStreamInfo ( app , stream , mediaServerId ) ;
if ( mediaItem ! = null ) {
type = OriginType . values ( ) [ mediaItem . getOriginType ( ) ] . getType ( ) ;
redisCatchStorage . removeStream ( mediaServerItem . getId ( ) , type , app , stream ) ;
}
GbStream gbStream = storager . getGbStream ( app , stream ) ;
if ( gbStream ! = null ) {
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
zlmMediaListManager . removeMedia ( app , stream ) ;
}
if ( type ! = null ) {
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject ( ) ;
jsonObject . put ( " serverId " , userSetting . getServerId ( ) ) ;
jsonObject . put ( " app " , app ) ;
jsonObject . put ( " stream " , stream ) ;
jsonObject . put ( " register " , regist ) ;
jsonObject . put ( " mediaServerId " , mediaServerId ) ;
redisCatchStorage . sendStreamChangeMsg ( type , jsonObject ) ;
}
}
JSONObject json = ( JSONObject ) JSON . toJSON ( param ) ;
taskExecutor . execute ( ( ) - > {
ZlmHttpHookSubscribe . Event subscribe = this . subscribe . sendNotify ( HookType . on_stream_changed , json ) ;
if ( subscribe ! = null ) {
MediaServerItem media Info = mediaServerService . getOne ( param . getMediaServerId ( ) ) ;
if ( mediaInfo ! = null ) {
subscribe . response ( mediaInfo , json ) ;
}
}
if ( ! reg ist ) {
List < SendRtpItem > sendRtpItems = redisCatchStorage . querySendRTPServerByStream ( stream ) ;
if ( sendRtpItems . size ( ) > 0 ) {
for ( SendRtpItem sendRtpItem : sendRtpItems ) {
if ( sendRtpItem . getApp ( ) . equals ( app ) ) {
String platformId = sendRtpItem . getPlatformId ( ) ;
ParentPlatform platform = storager . queryParentPlatByServerGBId ( platformId ) ;
Device device = deviceService . getDevice ( platformId ) ;
// 流消失移除 red is play
List < OnStreamChangedHookParam . MediaTrack > tracks = param . getTracks ( ) ;
if ( param . isRegist ( ) ) {
i f ( param . getOriginType ( ) = = OriginType . RTMP_PUSH . ordinal ( )
| | param . getOriginType ( ) = = OriginType . RTSP_PUSH . ordinal ( )
| | param . getOriginType ( ) = = OriginType . RTC_PUSH . ordinal ( ) ) {
try {
if ( platform ! = null ) {
commanderFroPlatform . streamByeCmd ( platform , sendRtpIte m ) ;
} else {
cmder . streamByeCmd ( device , sendRtpItem . g etChannelId ( ) , stream , sendRtpItem . getCallId ( ) ) ;
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage . getStreamAuthorityInfo ( param . getApp ( ) , param . getStream ( ) ) ;
if ( streamAuthorityInfo = = null ) {
streamAuthorityInfo = StreamAuthorityInfo . getInstanceByHook ( para m ) ;
} else {
streamAuthorityInfo . s etOriginType ( param . getOriginType ( ) ) ;
streamAuthorityInfo . setOriginTypeStr ( param . getOriginTypeStr ( ) ) ;
}
redisCatchStorage . updateStreamAuthorityInfo ( param . getApp ( ) , param . getStream ( ) , streamAuthorityInfo ) ;
}
} else {
redisCatchStorage . removeStreamAuthorityInfo ( param . getApp ( ) , param . getStream ( ) ) ;
}
if ( " rtsp " . equals ( param . getSchema ( ) ) ) {
if ( param . isRegist ( ) ) {
mediaServerService . addCount ( param . getMediaServerId ( ) ) ;
} else {
mediaServerService . removeCount ( param . getMediaServerId ( ) ) ;
}
if ( param . getOriginType ( ) = = OriginType . PULL . ordinal ( )
| | param . getOriginType ( ) = = OriginType . FFMPEG_PULL . ordinal ( ) ) {
// 设置拉流代理上线/离线
streamProxyService . updateStatus ( param . isRegist ( ) , param . getApp ( ) , param . getStream ( ) ) ;
}
if ( " rtp " . equals ( param . getApp ( ) ) & & ! param . isRegist ( ) ) {
StreamInfo streamInfo = redisCatchStorage . queryPlayByStreamId ( param . getStream ( ) ) ;
if ( streamInfo ! = null ) {
redisCatchStorage . stopPlay ( streamInfo ) ;
storager . stopPlay ( streamInfo . getDeviceID ( ) , streamInfo . getChannelId ( ) ) ;
} else {
streamInfo = redisCatchStorage . queryPlayback ( null , null , param . getStream ( ) , null ) ;
if ( streamInfo ! = null ) {
redisCatchStorage . stopPlayback ( streamInfo . getDeviceID ( ) , streamInfo . getChannelId ( ) ,
streamInfo . getStream ( ) , null ) ;
}
}
} else {
if ( ! " rtp " . equals ( param . getApp ( ) ) ) {
String type = OriginType . values ( ) [ param . getOriginType ( ) ] . getType ( ) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( param . getMediaServerId ( ) ) ;
if ( mediaServerItem ! = null ) {
if ( param . isRegist ( ) ) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage . getStreamAuthorityInfo ( param . getApp ( ) , param . getStream ( ) ) ;
String callId = null ;
if ( streamAuthorityInfo ! = null ) {
callId = streamAuthorityInfo . getCallId ( ) ;
}
StreamInfo streamInfoByAppAndStream = mediaService . getStreamInfoByAppAndStream ( mediaServerItem ,
param . getApp ( ) , param . getStream ( ) , tracks , callId ) ;
param . setStreamInfo ( streamInfoByAppAndStream ) ;
redisCatchStorage . addStream ( mediaServerItem , type , param . getApp ( ) , param . getStream ( ) , param ) ;
if ( param . getOriginType ( ) = = OriginType . RTSP_PUSH . ordinal ( )
| | param . getOriginType ( ) = = OriginType . RTMP_PUSH . ordinal ( )
| | param . getOriginType ( ) = = OriginType . RTC_PUSH . ordinal ( ) ) {
param . setSeverId ( userSetting . getServerId ( ) ) ;
zlmMediaListManager . addPush ( param ) ;
}
} else {
// 兼容流注销时类型从redis记录获取
OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage . getStreamInfo ( param . getApp ( ) , param . getStream ( ) , param . getMediaServerId ( ) ) ;
if ( onStreamChangedHookParam ! = null ) {
type = OriginType . values ( ) [ onStreamChangedHookParam . getOriginType ( ) ] . getType ( ) ;
redisCatchStorage . removeStream ( mediaServerItem . getId ( ) , type , param . getApp ( ) , param . getStream ( ) ) ;
}
GbStream gbStream = storager . getGbStream ( param . getApp ( ) , param . getStream ( ) ) ;
if ( gbStream ! = null ) {
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
zlmMediaListManager . removeMedia ( param . getApp ( ) , param . getStream ( ) ) ;
}
if ( type ! = null ) {
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject ( ) ;
jsonObject . put ( " serverId " , userSetting . getServerId ( ) ) ;
jsonObject . put ( " app " , param . getApp ( ) ) ;
jsonObject . put ( " stream " , param . getStream ( ) ) ;
jsonObject . put ( " register " , param . isRegist ( ) ) ;
jsonObject . put ( " mediaServerId " , param . getMediaServerId ( ) ) ;
redisCatchStorage . sendStreamChangeMsg ( type , jsonObject ) ;
}
}
}
}
if ( ! param . isRegist ( ) ) {
List < SendRtpItem > sendRtpItems = redisCatchStorage . querySendRTPServerByStream ( param . getStream ( ) ) ;
if ( sendRtpItems . size ( ) > 0 ) {
for ( SendRtpItem sendRtpItem : sendRtpItems ) {
if ( sendRtpItem . getApp ( ) . equals ( param . getApp ( ) ) ) {
String platformId = sendRtpItem . getPlatformId ( ) ;
ParentPlatform platform = storager . queryParentPlatByServerGBId ( platformId ) ;
Device device = deviceService . getDevice ( platformId ) ;
try {
if ( platform ! = null ) {
commanderFroPlatform . streamByeCmd ( platform , sendRtpItem ) ;
} else {
cmder . streamByeCmd ( device , sendRtpItem . getChannelId ( ) , param . getStream ( ) , sendRtpItem . getCallId ( ) ) ;
}
} catch ( SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e ) {
logger . error ( " [命令发送失败] 国标级联 发送BYE: {} " , e . getMessage ( ) ) ;
}
} catch ( SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e ) {
logger . error ( " [命令发送失败] 国标级联 发送BYE: {} " , e . getMessage ( ) ) ;
}
}
}
}
}
}
} ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
@@ -550,19 +419,16 @@ public class ZLMHttpHookListener {
*/
@ResponseBody
@PostMapping ( value = " /on_stream_none_reader " , produces = " application/json;charset=UTF-8 " )
public JSONObject onStreamNoneReader ( @RequestBody JSONObject json ) {
public JSONObject onStreamNoneReader ( @RequestBody OnStreamNoneReaderHookParam param ) {
logger . info ( " [ ZLM HOOK ]on_stream_none_reader API调用, 参数: " + json . toString ( ) ) ;
String mediaServerId = json . getString ( " mediaServerId " ) ;
String streamId = json . getString ( " stream " ) ;
String app = json . getString ( " app " ) ;
logger . info ( " [ZLM HOOK]流无人观看:{]->{}->{}/{} " + param . getMediaServerId ( ) , param . getSchema ( ) , param . getApp ( ) , param . getStream ( ) ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
// 录像下载
ret . put ( " close " , userSetting . getStreamOnDemand ( ) ) ;
if ( " rtp " . equals ( app ) ) {
if ( " rtp " . equals ( param . getApp ( ) ) ) {
ret . put ( " close " , userSetting . getStreamOnDemand ( ) ) ;
// 国标流, 点播/录像回放/录像下载
StreamInfo streamInfoForPlayCatch = redisCatchStorage . queryPlayByStreamId ( streamId ) ;
StreamInfo streamInfoForPlayCatch = redisCatchStorage . queryPlayByStreamId ( param . getStream ( ) ) ;
// 点播
if ( streamInfoForPlayCatch ! = null ) {
// 收到无人观看说明流也没有在往上级推送
@@ -596,7 +462,7 @@ public class ZLMHttpHookListener {
return ret ;
}
// 录像回放
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage . queryPlayback ( null , null , streamId , null ) ;
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage . queryPlayback ( null , null , param . getStream ( ) , null ) ;
if ( streamInfoForPlayBackCatch ! = null ) {
if ( streamInfoForPlayBackCatch . isPause ( ) ) {
ret . put ( " close " , false ) ;
@@ -617,7 +483,7 @@ public class ZLMHttpHookListener {
return ret ;
}
// 录像下载
StreamInfo streamInfoForDownload = redisCatchStorage . queryDownload ( null , null , streamId , null ) ;
StreamInfo streamInfoForDownload = redisCatchStorage . queryDownload ( null , null , param . getStream ( ) , null ) ;
// 进行录像下载时无人观看不断流
if ( streamInfoForDownload ! = null ) {
ret . put ( " close " , false ) ;
@@ -626,19 +492,19 @@ public class ZLMHttpHookListener {
} else {
// 非国标流 推流/拉流代理
// 拉流代理
StreamProxyItem streamProxyItem = streamProxyService . getStreamProxyByAppAndStream ( app , streamId ) ;
StreamProxyItem streamProxyItem = streamProxyService . getStreamProxyByAppAndStream ( param . getApp ( ) , param . getStream ( ) ) ;
if ( streamProxyItem ! = null ) {
if ( streamProxyItem . isEnable_remove_none_reader ( ) ) {
// 无人观看自动移除
ret . put ( " close " , true ) ;
streamProxyService . del ( app , streamId ) ;
streamProxyService . del ( param . getApp ( ) , param . getStream ( ) ) ;
String url = streamProxyItem . getUrl ( ) ! = null ? streamProxyItem . getUrl ( ) : streamProxyItem . getSrc_url ( ) ;
logger . info ( " [{}/{}]<-[{}] 拉流代理无人观看已经移除 " , app , streamId , url ) ;
logger . info ( " [{}/{}]<-[{}] 拉流代理无人观看已经移除 " , param . getApp ( ) , param . getStream ( ) , url ) ;
} else if ( streamProxyItem . isEnable_disable_none_reader ( ) ) {
// 无人观看停用
ret . put ( " close " , true ) ;
// 修改数据
streamProxyService . stop ( app , streamId ) ;
streamProxyService . stop ( param . getApp ( ) , param . getStream ( ) ) ;
} else {
ret . put ( " close " , false ) ;
}
@@ -660,35 +526,33 @@ public class ZLMHttpHookListener {
*/
@ResponseBody
@PostMapping ( value = " /on_stream_not_found " , produces = " application/json;charset=UTF-8 " )
public JSONObject onStreamNotFound ( @RequestBody JSONObject json ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_stream_not_found API调用, 参数: " + json . toString ( ) ) ;
}
String mediaServerId = json . getString ( " mediaServerId " ) ;
MediaServerItem mediaInfo = mediaServerService . getOne ( mediaServerId ) ;
if ( userSetting . isAutoApplyPlay ( ) & & mediaInfo ! = null ) {
String app = json . getString ( " app " ) ;
String streamId = json . getString ( " stream " ) ;
if ( " rtp " . equals ( app ) ) {
if ( mediaInfo . isRtpEnable ( ) ) {
String [ ] s = streamId . split ( " _ " ) ;
if ( s . length = = 2 ) {
String deviceId = s [ 0 ] ;
String channelId = s [ 1 ] ;
Device device = redisCatchStorage . getDevice ( deviceId ) ;
if ( device ! = null ) {
playService . play ( mediaInfo , deviceId , channelId , null , null , null ) ;
public JSONObject onStreamNotFound ( @RequestBody OnStreamNotFoundHookParam param ) {
logger . info ( " [ZLM HOOK] 流未找到:{}->{}->{}/{} " + param . getMediaServerId ( ) , param . getSchema ( ) , param . getApp ( ) , param . getStream ( ) ) ;
taskExecutor . execute ( ( ) - > {
MediaServerItem mediaInfo = mediaServerService . getOne ( param . getMediaServerId ( ) ) ;
if ( userSetting . isAutoApplyPlay ( ) & & mediaInfo ! = null ) {
if ( " rtp " . equals ( param . getApp ( ) ) ) {
if ( mediaInfo . isRtpEnable ( ) ) {
String [ ] s = param . getStream ( ) . split ( " _ " ) ;
if ( s . length = = 2 ) {
String deviceId = s [ 0 ] ;
String channelId = s [ 1 ] ;
Device device = redisCatchStorage . getDevice ( deviceId ) ;
if ( device ! = null ) {
playService . play ( mediaInfo , deviceId , channelId , null , null , null ) ;
}
}
}
}
} else {
// 拉流代理
StreamProxyItem streamProxyByAppAndStream = streamProxyService . getStreamProxyByAppAndStream ( app , streamId ) ;
if ( streamProxyByAppAndStream ! = null & & streamProxyByAppAndStream . isEnable_disable_none_reader ( ) ) {
streamProxyService . start ( app , streamId ) ;
} else {
// 拉流代理
StreamProxyItem streamProxyByAppAndStream = streamProxyService . getStreamProxyByAppAndStream ( param . getApp ( ) , param . getStream ( ) ) ;
if ( streamProxyByAppAndStream ! = null & & streamProxyByAppAndStream . isEnable_disable_none_reader ( ) ) {
streamProxyService . start ( param . getApp ( ) , param . getStream ( ) ) ;
}
}
}
}
} ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
@@ -704,22 +568,20 @@ public class ZLMHttpHookListener {
@PostMapping ( value = " /on_server_started " , produces = " application/json;charset=UTF-8 " )
public JSONObject onServerStarted ( HttpServletRequest request , @RequestBody JSONObject jsonObject ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( " [ ZLM HOOK ]on_server_started API调用, 参数: " + jsonObject . toString ( ) ) ;
}
String remoteAddr = request . getRemoteAddr ( ) ;
jsonObject . p ut( " ip " , remoteAddr ) ;
List < ZlmHttpHookSubscribe . Event > subscribes = this . subscribe . getSubscribes ( HookType . on_server_started ) ;
if ( subscribes ! = null & & subscribes . size ( ) > 0 ) {
for ( ZlmHttpHookSubscribe . Event subscribe : subscribes ) {
subscribe . response ( null , jsonObject ) ;
jsonObject . put ( " ip " , request . getRemoteAddr ( ) ) ;
ZLMServerConfig zlmServerConfig = JSON . to ( ZLMServerConfig . class , jsonObject ) ;
zlmServerConfig . setIp ( request . getRemoteAddr ( ) ) ;
logger . info ( " [ZLM HOOK] zlm 启动 " + zlmServerConfig . getGeneralMediaServerId ( ) ) ;
taskExecutor . exec ute ( ( ) - > {
List < ZlmHttpHookSubscribe . Event > subscribes = this . subscribe . getSubscribes ( HookType . on_server_started ) ;
if ( subscribes ! = null & & subscribes . size ( ) > 0 ) {
for ( ZlmHttpHookSubscribe . Event subscribe : subscribes ) {
subscribe . response ( null , jsonObject ) ;
}
}
}
ZLMServerConfig zlmServerConfig = jsonObject . to ( ZLMServerConfig . class ) ;
if ( zlmServerConfig ! = null ) {
mediaServerService . zlmServerOnline ( zlmServerConfig ) ;
}
} ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " msg " , " success " ) ;
@@ -731,33 +593,33 @@ public class ZLMHttpHookListener {
*/
@ResponseBody
@PostMapping ( value = " /on_send_rtp_stopped " , produces = " application/json;charset=UTF-8 " )
public JSONObject onSendRtpStopped ( HttpServletRequest request , @RequestBody JSONObject jsonObject ) {
public JSONObject onSendRtpStopped ( HttpServletRequest request , @RequestBody OnSendRtpStoppedHookParam param ) {
logger . info ( " [ ZLM HOOK ]on_send_rtp_stopped API调用, 参数: " + jsonObject ) ;
logger . info ( " [ZLM HOOK] 发送rtp被动关闭: {}->{}/{} " , param . getMediaServerId ( ) , param . getApp ( ) , param . getStream ( ) ) ;
JSONObject ret = new JSONObject ( ) ;
ret . put ( " code " , 0 ) ;
ret . put ( " msg " , " success " ) ;
// 查找对应的上级推流,发送停止
String app = jsonObject . getString ( " app " ) ;
if ( ! " rtp " . equals ( app ) ) {
if ( ! " rtp " . equals ( param . getApp ( ) ) ) {
return ret ;
}
String stream = jsonObject . getString ( " stream " ) ;
List < SendRtpItem > sendRtpItems = redisCatchStorage . querySendRTPServerByStream ( stream ) ;
if ( sendRtpItems . size ( ) > 0 ) {
for ( SendRtpItem sendRtpItem : sendRtpItems ) {
ParentPlatform parentPlatform = storager . queryParentPlatByServerGBId ( sendRtpItem . getPlatformId ( ) ) ;
try {
commanderFroPlatform . streamByeCmd ( parentPlatform , sendRtpItem . getCallId ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
logger . error ( " [命令发送失败] 国标级联 发送BYE: {} " , e . getMessage ( ) ) ;
taskExecutor . execute ( ( ) - > {
List < SendRtpItem > sendRtpItems = redisCatchStorage . querySendRTPServerByStream ( param . getStream ( ) ) ;
if ( sendRtpItems . size ( ) > 0 ) {
for ( SendRtpItem sendRtpItem : sendRtpItems ) {
ParentPlatform parentPlatform = storager . queryParentPlatByServerGBId ( sendRtpItem . getPlatformId ( ) ) ;
try {
commanderFroPlatform . streamByeCmd ( parentPlatform , sendRtpItem . getCallId ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
logger . error ( " [命令发送失败] 国标级联 发送BYE: {} " , e . getMessage ( ) ) ;
}
redisCatchStorage . deleteSendRTPServer ( parentPlatform . getServerGBId ( ) , sendRtpItem . getChannelId ( ) ,
sendRtpItem . getCallId ( ) , sendRtpItem . getStreamId ( ) ) ;
}
redisCatchStorage . deleteSendRTPServer ( parentPlatform . getServerGBId ( ) , sendRtpItem . getChannelId ( ) ,
sendRtpItem . getCallId ( ) , sendRtpItem . getStreamId ( ) ) ;
}
}
} ) ;
return ret ;