Merge branch 'refs/heads/2.7.0'

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java
#	src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
#	src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
#	src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
#	src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
#	src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
#	src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
This commit is contained in:
648540858
2024-04-23 20:59:20 +08:00
45 changed files with 2374 additions and 1528 deletions

View File

@@ -99,4 +99,13 @@ public interface IDeviceChannelService {
void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
void stopPlay(String deviceId, String channelId);
void batchUpdateChannelGPS(List<DeviceChannel> channelList);
void batchAddMobilePosition(List<MobilePosition> addMobilePositionList);
void online(DeviceChannel channel);
void offline(DeviceChannel channel);
void delete(DeviceChannel channel);
}

View File

@@ -115,4 +115,5 @@ public interface IStreamPushService {
Map<String, StreamPushItem> getAllAppAndStreamMap();
void updatePush(OnStreamChangedHookParam param);
}

View File

@@ -61,6 +61,7 @@ public class MessageForPushChannel {
messageForPushChannel.setGbId(gbId);
messageForPushChannel.setApp(app);
messageForPushChannel.setStream(stream);
messageForPushChannel.setServerId(serverId);
messageForPushChannel.setMediaServerId(mediaServerId);
messageForPushChannel.setPlatFormId(platFormId);
messageForPushChannel.setPlatFormName(platFormName);

View File

@@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
@@ -247,11 +248,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
return channelMapper.batchOnline(channels);
}
@Override
public void online(DeviceChannel channel) {
channelMapper.online(channel.getDeviceId(), channel.getChannelId());
}
@Override
public int channelsOffline(List<DeviceChannel> channels) {
return channelMapper.batchOffline(channels);
}
@Override
public void offline(DeviceChannel channel) {
channelMapper.offline(channel.getDeviceId(), channel.getChannelId());
}
@Override
public void delete(DeviceChannel channel) {
channelMapper.del(channel.getDeviceId(), channel.getChannelId());
}
@Override
public DeviceChannel getOne(String deviceId, String channelId){
return channelMapper.queryChannel(deviceId, channelId);
@@ -358,4 +375,47 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
public void stopPlay(String deviceId, String channelId) {
channelMapper.stopPlay(deviceId, channelId);
}
@Override
@Transactional
public void batchUpdateChannelGPS(List<DeviceChannel> channelList) {
for (DeviceChannel deviceChannel : channelList) {
deviceChannel.setUpdateTime(DateUtil.getNow());
if (deviceChannel.getGpsTime() == null) {
deviceChannel.setGpsTime(DateUtil.getNow());
}
}
int count = 1000;
if (channelList.size() > count) {
for (int i = 0; i < channelList.size(); i+=count) {
int toIndex = i+count;
if ( i + count > channelList.size()) {
toIndex = channelList.size();
}
List<DeviceChannel> channels = channelList.subList(i, toIndex);
channelMapper.batchUpdatePosition(channels);
}
}else {
channelMapper.batchUpdatePosition(channelList);
}
}
@Override
@Transactional
public void batchAddMobilePosition(List<MobilePosition> mobilePositions) {
// int count = 500;
// if (mobilePositions.size() > count) {
// for (int i = 0; i < mobilePositions.size(); i+=count) {
// int toIndex = i+count;
// if ( i + count > mobilePositions.size()) {
// toIndex = mobilePositions.size();
// }
// List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex);
// deviceMobilePositionMapper.batchadd(mobilePositionsSub);
// }
// }else {
// deviceMobilePositionMapper.batchadd(mobilePositions);
// }
deviceMobilePositionMapper.batchadd(mobilePositions);
}
}

View File

@@ -558,6 +558,7 @@ public class DeviceServiceImpl implements IDeviceService {
removeMobilePositionSubscribe(deviceInStore, result->{
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
// 因为是异步执行,需要在这里更新下数据
deviceMapper.updateCustom(deviceInStore);
@@ -566,12 +567,14 @@ public class DeviceServiceImpl implements IDeviceService {
}else {
// 开启订阅
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
addMobilePositionSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
// 取消订阅
deviceInStore.setSubscribeCycleForMobilePosition(0);
deviceInStore.setMobilePositionSubmissionInterval(0);
removeMobilePositionSubscribe(deviceInStore, null);
}
}

View File

@@ -28,7 +28,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
@@ -112,6 +111,14 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private RedisGbPlayMsgListener redisGbPlayMsgListener;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
@Autowired
private SSRCFactory ssrcFactory;
@@ -266,7 +273,6 @@ public class PlayServiceImpl implements IPlayService {
logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
}
Device device = redisCatchStorage.getDevice(deviceId);
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) {
logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId);
@@ -280,6 +286,8 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfo != null ) {
if (inviteInfo.getStreamInfo() == null) {
// 释放生成的ssrc使用上一次申请的
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);

View File

@@ -633,4 +633,21 @@ public class StreamPushServiceImpl implements IStreamPushService {
public Map<String, StreamPushItem> getAllAppAndStreamMap() {
return streamPushMapper.getAllAppAndStreamMap();
}
@Override
public void updatePush(OnStreamChangedHookParam param) {
StreamPushItem transform = transform(param);
StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
transform.setPushIng(param.isRegist());
transform.setUpdateTime(DateUtil.getNow());
transform.setPushTime(DateUtil.getNow());
transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
if (pushInDb == null) {
transform.setCreateTime(DateUtil.getNow());
streamPushMapper.add(transform);
}else {
streamPushMapper.update(transform);
gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
}
}
}

View File

@@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
SendRtpItem getSendRtpItem(String sendRtpItemKey);
WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem);
WVPResult stopSendRtp(String sendRtpItemKey);
long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
void rtpSendStopped(String sendRtpItemKey);
void removeCallback(long key);
}

View File

@@ -1,451 +0,0 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 监听下级发送推送信息,并发送国标推流消息上级
* @author lin
*/
@Component
public class RedisGbPlayMsgListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
/**
* 流媒体不存在的错误玛
*/
public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
/**
* 离线的错误玛
*/
public static final int ERROR_CODE_OFFLINE = -2;
/**
* 超时的错误玛
*/
public static final int ERROR_CODE_TIMEOUT = -3;
private final Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
private final Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
private final Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private DynamicTask dynamicTask;
@Autowired
private HookSubscribe subscribe;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
public interface PlayMsgCallback{
void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
}
public interface PlayMsgCallbackForStartSendRtpStream{
void handler();
}
public interface PlayMsgErrorCallback{
void handler(WVPResult wvpResult);
}
@Override
public void onMessage(Message message, byte[] bytes) {
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message);
if (isEmpty) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class);
logger.info("[收到REDIS通知] 消息: {}", JSON.toJSONString(wvpRedisMsg));
if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
continue;
}
if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody()));
switch (wvpRedisMsg.getCmd()){
case WvpRedisMsgCmd.GET_SEND_ITEM:
RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class);
requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
break;
case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
break;
case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
break;
default:
break;
}
}else {
logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody()));
switch (wvpRedisMsg.getCmd()){
case WvpRedisMsgCmd.GET_SEND_ITEM:
WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
String key = wvpRedisMsg.getSerial();
switch (content.getCode()) {
case 0:
ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
PlayMsgCallback playMsgCallback = callbacks.get(key);
if (playMsgCallback != null) {
callbacksForError.remove(key);
try {
playMsgCallback.handler(responseSendItemMsg);
} catch (ParseException e) {
logger.error("[REDIS消息处理异常] ", e);
}
}
break;
case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
case ERROR_CODE_OFFLINE:
case ERROR_CODE_TIMEOUT:
PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
if (errorCallback != null) {
callbacks.remove(key);
errorCallback.handler(content);
}
break;
default:
break;
}
break;
case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
String serial = wvpRedisMsg.getSerial();
switch (wvpResult.getCode()) {
case 0:
PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
if (playMsgCallback != null) {
callbacksForError.remove(serial);
playMsgCallback.handler();
}
break;
case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
case ERROR_CODE_OFFLINE:
case ERROR_CODE_TIMEOUT:
PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
if (errorCallback != null) {
callbacks.remove(serial);
errorCallback.handler(wvpResult);
}
break;
default:
break;
}
break;
default:
break;
}
}
}catch (Exception e) {
logger.warn("[RedisGbPlayMsg] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
logger.error("[RedisGbPlayMsg] 异常内容: ", e);
}
}
});
}
}
/**
* 处理收到的请求推流的请求
*/
private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
if (mediaServer == null) {
// TODO 回复错误
return;
}
SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg);
try {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
}catch (ControllerException e) {
return;
}
// 回复消息
WVPResult<JSONObject> result = new WVPResult<>();
result.setCode(0);
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId,
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
* 处理收到的请求sendItem的请求
*/
private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
MediaServer mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
if (mediaServerItem == null) {
logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId());
WVPResult<SendRtpItem> result = new WVPResult<>();
result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
result.setMsg("流媒体不存在");
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result));
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
return;
}
// 确定流是否在线
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
if (streamReady != null && streamReady) {
logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream());
responseSendItem(mediaServerItem, content, toId, serial);
}else {
// 流已经离线
// 发送redis消息以使设备上线
logger.info("[ app={}, stream={} ]通道离线发送redis信息控制设备开始推流",content.getApp(), content.getStream());
String taskKey = UUID.randomUUID().toString();
// 设置超时
dynamicTask.startDelay(taskKey, ()->{
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream());
WVPResult<SendRtpItem> result = new WVPResult<>();
result.setCode(ERROR_CODE_TIMEOUT);
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
);
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}, userSetting.getPlatformPlayTimeout());
// 添加订阅
Hook hook = Hook.getInstance(HookType.on_media_arrival, content.getApp(), content.getStream(), content.getMediaServerId());
subscribe.addSubscribe(hook, (hookData)->{
dynamicTask.stop(taskKey);
responseSendItem(mediaServerItem, content, toId, serial);
});
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
content.getMediaServerId());
String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream());
redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
}
}
/**
* 将获取到的sendItem发送出去
*/
private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(),
content.getPort(), content.getSsrc(), content.getPlatformId(),
content.getApp(), content.getStream(), content.getChannelId(),
content.getTcp(), content.getRtcp());
WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
result.setCode(0);
ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
responseSendItemMsg.setSendRtpItem(sendRtpItem);
responseSendItemMsg.setMediaServerItem(mediaServerItem);
result.setData(responseSendItemMsg);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
);
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
* 发送消息要求下级生成推流信息
* @param serverId 下级服务ID
* @param app 应用名
* @param stream 流ID
* @param ip 目标IP
* @param port 目标端口
* @param ssrc ssrc
* @param platformId 平台国标编号
* @param channelId 通道ID
* @param isTcp 是否使用TCP
* @param callback 得到信息的回调
*/
public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName);
requestSendItemMsg.setServerId(serverId);
String key = UUID.randomUUID().toString();
WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
key, JSON.toJSONString(requestSendItemMsg));
JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject);
callbacks.put(key, callback);
callbacksForError.put(key, errorCallback);
dynamicTask.startDelay(key, ()->{
callbacks.remove(key);
callbacksForError.remove(key);
WVPResult<Object> wvpResult = new WVPResult<>();
wvpResult.setCode(ERROR_CODE_TIMEOUT);
wvpResult.setMsg("timeout");
errorCallback.handler(wvpResult);
}, userSetting.getPlatformPlayTimeout());
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
* 发送请求推流的消息
* @param param 推流参数
* @param callback 回调
*/
public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
String key = UUID.randomUUID().toString();
WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param));
JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject);
dynamicTask.startDelay(key, ()->{
callbacksForStartSendRtpStream.remove(key);
callbacksForError.remove(key);
}, userSetting.getPlatformPlayTimeout());
callbacksForStartSendRtpStream.put(key, callback);
callbacksForError.put(key, (wvpResult)->{
logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg());
callbacksForStartSendRtpStream.remove(key);
callbacksForError.remove(key);
});
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
* 发送请求推流的消息
*/
public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
String key = UUID.randomUUID().toString();
WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
if (platformGbId == null) {
platformGbId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (streamId == null) {
streamId = "*";
}
if (callId == null) {
callId = "*";
}
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_"
+ channelId + "_"
+ streamId + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
}else {
return null;
}
}
/**
* 处理收到的请求推流的请求
*/
private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
if (sendRtpItem == null) {
logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL");
return;
}
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaInfo == null) {
// TODO 回复错误
return;
}
if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) {
logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 发送redis消息
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
}
}

View File

@@ -1,111 +0,0 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 接收redis发送的结束推流请求
* @author lin
*/
@Component
public class RedisPushStreamCloseResponseListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorage storager;
@Autowired
private ISIPCommanderForPlatform commanderFroPlatform;
@Autowired
private UserSetting userSetting;
@Autowired
private IMediaServerService mediaServerService;
private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
public interface PushStreamResponseEvent{
void run(MessageForPushChannelResponse response);
}
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("[REDIS消息-推流结束] {}", new String(message.getBody()));
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
if (push != null) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
push.getGbId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
if (parentPlatform != null) {
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}
if (push.isSelf()) {
// 停止向上级推流
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", sendRtpItem.getStream());
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
}
}
}
}
}
public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
responseEvents.put(app + stream, callback);
}
public void removeEvent(String app, String stream) {
responseEvents.remove(app + stream);
}
}

View File

@@ -1,97 +0,0 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 接收其他wvp发送流变化通知
* @author lin
*/
@Component
public class RedisStreamMsgListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
@Autowired
private UserSetting userSetting;
@Autowired
private ZLMMediaListManager zlmMediaListManager;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override
public void onMessage(Message message, byte[] bytes) {
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message);
if (isEmpty) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
if (steamMsgJson == null) {
logger.warn("[收到redis 流变化]消息解析失败");
continue;
}
String serverId = steamMsgJson.getString("serverId");
if (userSetting.getServerId().equals(serverId)) {
// 自己发送的消息忽略即可
continue;
}
logger.info("[收到redis 流变化] {}", new String(message.getBody()));
String app = steamMsgJson.getString("app");
String stream = steamMsgJson.getString("stream");
boolean register = steamMsgJson.getBoolean("register");
String mediaServerId = steamMsgJson.getString("mediaServerId");
OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
onStreamChangedHookParam.setSeverId(serverId);
onStreamChangedHookParam.setApp(app);
onStreamChangedHookParam.setStream(stream);
onStreamChangedHookParam.setRegist(register);
onStreamChangedHookParam.setMediaServerId(mediaServerId);
onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
onStreamChangedHookParam.setAliveSecond(0L);
onStreamChangedHookParam.setTotalReaderCount(0);
onStreamChangedHookParam.setOriginType(0);
onStreamChangedHookParam.setOriginTypeStr("0");
onStreamChangedHookParam.setOriginTypeStr("unknown");
ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream);
if ( channelOnlineEventLister != null) {
try {
channelOnlineEventLister.run(app, stream, serverId);;
} catch (ParseException e) {
logger.error("addPush: ", e);
}
zlmMediaListManager.removedChannelOnlineEventLister(app, stream);
}
}catch (Exception e) {
logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
logger.error("[REDIS消息-流变化] 异常内容: ", e);
}
}
});
}
}
}

View File

@@ -0,0 +1,304 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
/**
* 其他wvp发起的rpc调用这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
*/
@Component
public class RedisRpcController {
private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private SendRtpPortManager sendRtpPortManager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private UserSetting userSetting;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private ISIPCommanderForPlatform commanderFroPlatform;
@Autowired
private IVideoManagerStorage storager;
/**
* 获取发流的信息
*/
public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
logger.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 key{}", sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
return response;
}
logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流
MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem == null) {
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 自平台内容
int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
if (localPort == 0) {
logger.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setBody(sendRtpItemKey);
return response;
}
/**
* 监听流上线
*/
public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 查询本级是否有这个流
MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem != null) {
logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
}
sendRtpItem.setMediaServerId(mediaServerItem.getId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
sendRtpItem.setServerId(userSetting.getServerId());
redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getRedisKey());
response.setStatusCode(200);
}
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
sendRtpItem.setSsrc(ssrc);
}
sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
sendRtpItem.setServerId(userSetting.getServerId());
redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getRedisKey());
response.setStatusCode(200);
// 手动发送结果
sendResponse(response);
hookSubscribe.removeSubscribe(hook);
});
return null;
}
/**
* 停止监听流上线
*/
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
hookSubscribe.removeSubscribe(hook);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
return response;
}
/**
* 开始发流
*/
public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
logger.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 key{}", sendRtpItemKey);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServerItem == null) {
logger.info("[redis-rpc] startSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (!streamReady) {
logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
response.setBody(wvpResult);
return response;
}
JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
if (jsonObject.getInteger("code") == 0) {
logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
WVPResult wvpResult = WVPResult.success();
response.setBody(wvpResult);
}else {
logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}{} {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), jsonObject);
WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
response.setBody(wvpResult);
}
return response;
}
/**
* 停止发流
*/
public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
logger.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", sendRtpItemKey);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult);
return response;
}
logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServerItem == null) {
logger.info("[redis-rpc] stopSendRtp->未找到MediaServer {}", sendRtpItem.getMediaServerId() );
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
response.setBody(wvpResult);
return response;
}
JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
if (jsonObject.getInteger("code") == 0) {
logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
response.setBody(WVPResult.success());
return response;
}else {
int code = jsonObject.getInteger("code");
String msg = jsonObject.getString("msg");
logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}{} code {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), code, msg );
response.setBody(WVPResult.fail(code, msg));
return response;
}
}
/**
* 其他wvp通知推流已经停止了
*/
public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
logger.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息 key{}", sendRtpItemKey);
return response;
}
logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
String platformId = sendRtpItem.getPlatformId();
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
if (platform == null) {
return response;
}
try {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
}
return response;
}
private void sendResponse(RedisRpcResponse response){
logger.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
}

View File

@@ -0,0 +1,155 @@
package com.genersoft.iot.vmp.service.redisMsg.service;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class RedisRpcServiceImpl implements IRedisRpcService {
private final static Logger logger = LoggerFactory.getLogger(RedisRpcServiceImpl.class);
@Autowired
private RedisRpcConfig redisRpcConfig;
@Autowired
private UserSetting userSetting;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisRpcRequest buildRequest(String uri, Object param) {
RedisRpcRequest request = new RedisRpcRequest();
request.setFromId(userSetting.getServerId());
request.setParam(param);
request.setUri(uri);
return request;
}
@Override
public SendRtpItem getSendRtpItem(String sendRtpItemKey) {
RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey);
RedisRpcResponse response = redisRpcConfig.request(request, 10);
if (response.getBody() == null) {
return null;
}
return (SendRtpItem)redisTemplate.opsForValue().get(response.getBody().toString());
}
@Override
public WVPResult startSendRtp(String sendRtpItemKey, SendRtpItem sendRtpItem) {
logger.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult stopSendRtp(String sendRtpItemKey) {
SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
logger.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息 key{}", sendRtpItemKey);
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
}
logger.info("[请求其他WVP] 停止推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey);
request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public long waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback) {
logger.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
sendRtpItem.setSsrc(ssrc);
}
sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
sendRtpItem.setServerId(userSetting.getServerId());
redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem);
if (callback != null) {
callback.run(sendRtpItem.getRedisKey());
}
hookSubscribe.removeSubscribe(hook);
redisRpcConfig.removeCallback(request.getSn());
});
redisRpcConfig.request(request, response -> {
if (response.getBody() == null) {
logger.info("[请求所有WVP监听流上线] 流上线,但是未找到发流信息:{}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
return;
}
logger.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString());
if (callback != null) {
callback.run(response.getBody().toString());
}
hookSubscribe.removeSubscribe(hook);
});
return request.getSn();
}
@Override
public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
logger.info("[停止WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
hookSubscribe.removeSubscribe(hook);
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, 10);
}
@Override
public void rtpSendStopped(String sendRtpItemKey) {
SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItem == null) {
logger.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", sendRtpItemKey);
return;
}
RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey);
request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, 10);
}
@Override
public void removeCallback(long key) {
redisRpcConfig.removeCallback(key);
}
}