临时提交
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package com.genersoft.iot.vmp.service.redisMsg;
|
||||
|
||||
import com.genersoft.iot.vmp.common.CommonCallback;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||
|
||||
@@ -19,4 +20,7 @@ public interface IRedisRpcService {
|
||||
void rtpSendStopped(String sendRtpItemKey);
|
||||
|
||||
void removeCallback(long key);
|
||||
|
||||
long onStreamOnlineEvent(String app, String stream, CommonCallback<StreamInfo> callback);
|
||||
void unPushStreamOnlineEvent(String app, String stream);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.genersoft.iot.vmp.service.redisMsg.control;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
||||
@@ -162,6 +163,39 @@ public class RedisRpcController {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听流上线
|
||||
*/
|
||||
public RedisRpcResponse onPushStreamOnlineEvent(RedisRpcRequest request) {
|
||||
StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class);
|
||||
log.info("[redis-rpc] 监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
|
||||
// 查询本级是否有这个流
|
||||
StreamInfo streamInfoInServer = mediaServerService.getMediaByAppAndStream(streamInfo.getApp(), streamInfo.getStream());
|
||||
if (streamInfoInServer != null) {
|
||||
log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", streamInfo.getApp(), streamInfo.getStream());
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setBody(streamInfoInServer);
|
||||
response.setStatusCode(200);
|
||||
return response;
|
||||
}
|
||||
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||
Hook hook = Hook.getInstance(HookType.on_media_arrival, streamInfo.getApp(), streamInfo.getStream(), null);
|
||||
hookSubscribe.addSubscribe(hook, (hookData) -> {
|
||||
log.info("[redis-rpc] 监听流上线,流已上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
|
||||
// 读取redis中的上级点播信息,生成sendRtpItm发送出去
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setBody(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(),
|
||||
streamInfo.getApp(), streamInfo.getStream(), hookData.getMediaInfo(),
|
||||
hookData.getMediaInfo() != null ? hookData.getMediaInfo().getCallId() : null));
|
||||
response.setStatusCode(200);
|
||||
// 手动发送结果
|
||||
sendResponse(response);
|
||||
hookSubscribe.removeSubscribe(hook);
|
||||
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止监听流上线
|
||||
*/
|
||||
@@ -176,6 +210,20 @@ public class RedisRpcController {
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止监听流上线
|
||||
*/
|
||||
public RedisRpcResponse unPushStreamOnlineEvent(RedisRpcRequest request) {
|
||||
StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class);
|
||||
log.info("[redis-rpc] 停止监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream());
|
||||
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||
Hook hook = Hook.getInstance(HookType.on_media_arrival, streamInfo.getApp(), streamInfo.getStream(), null);
|
||||
hookSubscribe.removeSubscribe(hook);
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setStatusCode(200);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 开始发流
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg.service;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.common.CommonCallback;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
||||
@@ -11,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
||||
import com.genersoft.iot.vmp.media.event.hook.Hook;
|
||||
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
|
||||
import com.genersoft.iot.vmp.media.event.hook.HookType;
|
||||
import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||
@@ -38,6 +40,10 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
|
||||
@Autowired
|
||||
private RedisTemplate<Object, Object> redisTemplate;
|
||||
|
||||
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
private RedisRpcRequest buildRequest(String uri, Object param) {
|
||||
RedisRpcRequest request = new RedisRpcRequest();
|
||||
request.setFromId(userSetting.getServerId());
|
||||
@@ -146,4 +152,48 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
|
||||
public void removeCallback(long key) {
|
||||
redisRpcConfig.removeCallback(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long onStreamOnlineEvent(String app, String stream, CommonCallback<StreamInfo> callback) {
|
||||
|
||||
log.info("[请求所有WVP监听流上线] {}/{}", app, stream);
|
||||
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||
Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, null);
|
||||
StreamInfo streamInfoParam = new StreamInfo();
|
||||
streamInfoParam.setApp(app);
|
||||
streamInfoParam.setStream(stream);
|
||||
RedisRpcRequest request = buildRequest("onPushStreamOnlineEvent", streamInfoParam);
|
||||
hookSubscribe.addSubscribe(hook, (hookData) -> {
|
||||
if (callback != null) {
|
||||
callback.run(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(),
|
||||
app, stream, hookData.getMediaInfo(),
|
||||
hookData.getMediaInfo() != null ? hookData.getMediaInfo().getCallId() : null));
|
||||
}
|
||||
hookSubscribe.removeSubscribe(hook);
|
||||
redisRpcConfig.removeCallback(request.getSn());
|
||||
});
|
||||
|
||||
redisRpcConfig.request(request, response -> {
|
||||
if (response.getBody() == null) {
|
||||
log.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", app, stream);
|
||||
return;
|
||||
}
|
||||
log.info("[请求所有WVP监听流上线] 流上线 {}/{}", app, stream);
|
||||
|
||||
if (callback != null) {
|
||||
callback.run((StreamInfo) response.getBody());
|
||||
}
|
||||
hookSubscribe.removeSubscribe(hook);
|
||||
});
|
||||
return request.getSn();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unPushStreamOnlineEvent(String app, String stream) {
|
||||
StreamInfo streamInfoParam = new StreamInfo();
|
||||
streamInfoParam.setApp(app);
|
||||
streamInfoParam.setStream(stream);
|
||||
RedisRpcRequest request = buildRequest("unPushStreamOnlineEvent", streamInfoParam);
|
||||
redisRpcConfig.request(request, 10);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user