添加第三方服务参与的推流直接转发到国标功能

This commit is contained in:
648540858
2021-12-04 17:27:23 +08:00
parent 52656bb893
commit 5b0b17d741
23 changed files with 324 additions and 77 deletions

View File

@@ -55,5 +55,8 @@ public class VideoManagerConstants {
public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
//************************** redis 消息*********************************
public static final String WVP_MSG_STREAM_CHANGE__PREFIX = "WVP_MSG_STREAM_CHANGE_";
public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
//************************** 第三方 ****************************************
public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
}

View File

@@ -29,6 +29,8 @@ public class UserSetup {
private String serverId = "000000";
private String thirdPartyGBIdReg = "[\\s\\S]*";
private List<String> interfaceAuthenticationExcludes = new ArrayList<>();
public Boolean getSavePositionHistory() {
@@ -114,4 +116,12 @@ public class UserSetup {
public void setServerId(String serverId) {
this.serverId = serverId;
}
public String getThirdPartyGBIdReg() {
return thirdPartyGBIdReg;
}
public void setThirdPartyGBIdReg(String thirdPartyGBIdReg) {
this.thirdPartyGBIdReg = thirdPartyGBIdReg;
}
}

View File

@@ -104,6 +104,11 @@ public class ParentPlatform {
*/
private int channelCount;
/**
* 共享所有的直播流
*/
private boolean shareAllLiveStream;
public Integer getId() {
return id;
}
@@ -264,4 +269,12 @@ public class ParentPlatform {
this.channelCount = channelCount;
}
public boolean isShareAllLiveStream() {
return shareAllLiveStream;
}
public void setShareAllLiveStream(boolean shareAllLiveStream) {
this.shareAllLiveStream = shareAllLiveStream;
}
}

View File

@@ -31,6 +31,7 @@ import javax.sip.header.FromHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.List;
import java.util.Vector;
/**
@@ -105,7 +106,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (platform != null) {
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
List<GbStream> gbStreams = storager.queryStreamInParentPlatform(requesterId, channelId);
GbStream gbStream = gbStreams.size() > 0? gbStreams.get(0):null;
MediaServerItem mediaServerItem = null;
// 不是通道可能是直播流
if (channel != null && gbStream == null ) {

View File

@@ -1,21 +1,28 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
public class ZLMMediaListManager {
@@ -40,9 +47,15 @@ public class ZLMMediaListManager {
@Autowired
private IStreamPushService streamPushService;
@Autowired
private StreamPushMapper streamPushMapper;
@Autowired
private ZLMHttpHookSubscribe subscribe;
@Autowired
private UserSetup userSetup;
public void updateMediaList(MediaServerItem mediaServerItem) {
storager.clearMediaList();
@@ -89,7 +102,43 @@ public class ZLMMediaListManager {
}
public void addMedia(MediaItem mediaItem) {
storager.updateMedia(streamPushService.transform(mediaItem));
// 查找此直播流是否存在redis预设gbId
StreamPushItem transform = streamPushService.transform(mediaItem);
// 从streamId取出查询关键值
Pattern pattern = Pattern.compile(userSetup.getThirdPartyGBIdReg());
Matcher matcher = pattern.matcher(mediaItem.getStream());// 指定要匹配的字符串
String queryKey = null;
if (matcher.find()) { //此处find每次被调用后会偏移到下一个匹配
queryKey = matcher.group();
}
if (queryKey != null) {
ThirdPartyGB thirdPartyGB = redisCatchStorage.queryMemberNoGBId(queryKey);
if (thirdPartyGB != null && !StringUtils.isEmpty(thirdPartyGB.getNationalStandardNo())) {
transform.setGbId(thirdPartyGB.getNationalStandardNo());
transform.setName(thirdPartyGB.getName());
}
}
storager.updateMedia(transform);
if (!StringUtils.isEmpty(transform.getGbId())) {
// 如果这个国标ID已经给了其他推流且流已离线则移除其他推流
List<GbStream> gbStreams = gbStreamMapper.selectByGBId(transform.getGbId());
if (gbStreams.size() > 0) {
for (GbStream gbStream : gbStreams) {
// 出现使用相同国标Id的视频流时使用新流替换旧流
gbStreamMapper.del(gbStream.getApp(), gbStream.getStream());
platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream());
if (!gbStream.isStatus()) {
streamPushMapper.del(gbStream.getApp(), gbStream.getStream());
}
}
}
if (gbStreamMapper.selectOne(transform.getApp(), transform.getStream()) != null) {
gbStreamMapper.update(transform);
}else {
gbStreamMapper.add(transform);
}
}
}

View File

@@ -37,4 +37,13 @@ public interface IStreamPushService {
StreamPushItem transform(MediaItem item);
StreamPushItem getPush(String app, String streamId);
/**
* 停止一路推流
* @param app 应用名
* @param streamId 流ID
* @return
*/
boolean stop(String app, String streamId);
}

View File

@@ -0,0 +1,23 @@
package com.genersoft.iot.vmp.service.bean;
public class ThirdPartyGB {
private String name;
private String nationalStandardNo;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getNationalStandardNo() {
return nationalStandardNo;
}
public void setNationalStandardNo(String nationalStandardNo) {
this.nationalStandardNo = nationalStandardNo;
}
}

View File

@@ -79,44 +79,38 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
StringBuffer result = new StringBuffer();
boolean streamLive = false;
param.setMediaServerId(mediaInfo.getId());
boolean saveResult;
// 更新
if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
if (videoManagerStorager.updateStreamProxy(param)) {
result.append("保存成功");
if (param.isEnable()){
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject == null) {
result.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
videoManagerStorager.updateStreamProxy(param);
}else {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null);
wvpResult.setData(streamInfo);
}
}
}
saveResult = videoManagerStorager.updateStreamProxy(param);
}else { // 新增
if (videoManagerStorager.addStreamProxy(param)){
result.append("保存成功");
streamLive = true;
if (param.isEnable()) {
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject == null) {
streamLive = false;
result.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
videoManagerStorager.updateStreamProxy(param);
}else {
saveResult = videoManagerStorager.addStreamProxy(param);
}
if (saveResult) {
result.append("保存成功");
if (param.isEnable()) {
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject == null) {
streamLive = false;
result.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
videoManagerStorager.updateStreamProxy(param);
}else {
Integer code = jsonObject.getInteger("code");
if (code == 0) {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null);
wvpResult.setData(streamInfo);
}else {
result.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
videoManagerStorager.updateStreamProxy(param);
}
}
}else {
result.append("保存失败");
}
}
}
}else {
result.append("保存失败");
}
if (param.getPlatformGbId() != null && streamLive) {
List<GbStream> gbStreams = new ArrayList<>();

View File

@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@@ -32,6 +33,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Autowired
private StreamPushMapper streamPushMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@@ -116,4 +120,18 @@ public class StreamPushServiceImpl implements IStreamPushService {
return streamPushMapper.selectOne(app, streamId);
}
@Override
public boolean stop(String app, String streamId) {
StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
int delStream = streamPushMapper.del(app, streamId);
gbStreamMapper.del(app, streamId);
platformGbStreamMapper.delByAppAndStream(app, streamId);
if (delStream > 0) {
MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
}
return true;
}
}

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import java.util.List;
import java.util.Map;
@@ -152,4 +153,11 @@ public interface IRedisCatchStorage {
boolean startDownload(StreamInfo streamInfo);
StreamInfo queryDownloadByStreamId(String streamId);
/**
* 查找第三方系统留下的国标预设值
* @param queryKey
* @return
*/
ThirdPartyGB queryMemberNoGBId(String queryKey);
}

View File

@@ -327,7 +327,7 @@ public interface IVideoManagerStorager {
* @param channelId
* @return
*/
GbStream queryStreamInParentPlatform(String platformId, String channelId);
List<GbStream> queryStreamInParentPlatform(String platformId, String channelId);
/**
* 获取平台关联的直播流

View File

@@ -40,10 +40,13 @@ public interface GbStreamMapper {
@Select("SELECT * FROM gb_stream WHERE app=#{app} AND stream=#{stream}")
StreamProxyItem selectOne(String app, String stream);
@Select("SELECT * FROM gb_stream WHERE gbId=#{gbId}")
List<GbStream> selectByGBId(String gbId);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
GbStream queryStreamInPlatform(String platformId, String gbId);
List<GbStream> queryStreamInPlatform(String platformId, String gbId);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +

View File

@@ -15,10 +15,10 @@ public interface ParentPlatformMapper {
@Insert("INSERT INTO parent_platform (enable, name, serverGBId, serverGBDomain, serverIP, serverPort, deviceGBId, deviceIp, " +
" devicePort, username, password, expires, keepTimeout, transport, characterSet, ptz, rtcp, " +
" status) " +
" status, shareAllLiveStream) " +
" VALUES (${enable}, '${name}', '${serverGBId}', '${serverGBDomain}', '${serverIP}', ${serverPort}, '${deviceGBId}', '${deviceIp}', " +
" '${devicePort}', '${username}', '${password}', '${expires}', '${keepTimeout}', '${transport}', '${characterSet}', ${ptz}, ${rtcp}, " +
" ${status})")
" ${status}, ${shareAllLiveStream})")
int addParentPlatform(ParentPlatform parentPlatform);
@Update("UPDATE parent_platform " +
@@ -39,7 +39,8 @@ public interface ParentPlatformMapper {
"characterSet=#{characterSet}, " +
"ptz=#{ptz}, " +
"rtcp=#{rtcp}, " +
"status=#{status} " +
"status=#{status}, " +
"shareAllLiveStream=#{shareAllLiveStream} " +
"WHERE id=#{id}")
int updateParentPlatform(ParentPlatform parentPlatform);
@@ -70,4 +71,7 @@ public interface ParentPlatformMapper {
@Update("UPDATE parent_platform SET status=#{online} WHERE serverGBId=#{platformGbID}" )
int updateParentPlatformStatus(String platformGbID, boolean online);
@Select("SELECT * FROM parent_platform WHERE shareAllLiveStream=true")
List<ParentPlatform> selectAllAhareAllLiveStream();
}

View File

@@ -24,4 +24,7 @@ public interface PlatformGbStreamMapper {
@Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}")
List<StreamProxyItem> selectByAppAndStream(String app, String stream);
@Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{serverGBId}")
StreamProxyItem selectOne(String app, String stream, String serverGBId);
}

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
@@ -13,6 +14,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.text.SimpleDateFormat;
import java.util.*;
@@ -324,7 +326,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE__PREFIX + type;
String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type;
logger.debug("[redis 流变化事件] {}: {}", key, jsonObject.toString());
redis.convertAndSend(key, jsonObject);
}
@@ -350,4 +352,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
if (playLeys == null || playLeys.size() == 0) return null;
return (StreamInfo)redis.get(playLeys.get(0).toString());
}
@Override
public ThirdPartyGB queryMemberNoGBId(String queryKey) {
String key = VideoManagerConstants.WVP_STREAM_GB_ID_PREFIX + queryKey;
JSONObject jsonObject = (JSONObject)redis.get(key);
return JSONObject.toJavaObject(jsonObject, ThirdPartyGB.class);
}
}

View File

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.*;
@@ -19,6 +20,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -69,6 +71,16 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Autowired
private GbStreamMapper gbStreamMapper;
;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IGbStreamService gbStreamService;
@Autowired
private ParentPlatformMapper parentPlatformMapper;
@Autowired
private VideoStreamSessionManager streamSession;
@@ -356,6 +368,15 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
// 更新缓存
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
// 共享所有视频流,需要将现有视频流添加到此平台
List<GbStream> gbStreams = gbStreamMapper.selectAll();
if (gbStreams.size() > 0) {
if (parentPlatform.isShareAllLiveStream()) {
gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId());
}else {
gbStreamService.delPlatformInfo(gbStreams);
}
}
return result > 0;
}
@@ -561,7 +582,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
* @return
*/
@Override
public GbStream queryStreamInParentPlatform(String platformId, String gbId) {
public List<GbStream> queryStreamInParentPlatform(String platformId, String gbId) {
return gbStreamMapper.queryStreamInPlatform(platformId, gbId);
}
@@ -602,6 +623,22 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
streamPushMapper.add(streamPushItem);
gbStreamMapper.setStatus(streamPushItem.getApp(), streamPushItem.getStream(), true);
if(!StringUtils.isEmpty(streamPushItem.getGbId() )){
// 查找开启了全部直播流共享的上级平台
List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
if (parentPlatforms.size() > 0) {
for (ParentPlatform parentPlatform : parentPlatforms) {
streamPushItem.setPlatformId(parentPlatform.getServerGBId());
String stream = streamPushItem.getStream();
StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(streamPushItem.getApp(), stream, parentPlatform.getServerGBId());
if (streamProxyItems == null) {
platformGbStreamMapper.add(streamPushItem);
}
}
}
}
}
@Override

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.vmanager.streamPush;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
@@ -71,4 +72,21 @@ public class StreamPushController {
return "fail";
}
}
@ApiOperation("中止一个推流")
@ApiImplicitParams({
@ApiImplicitParam(name = "app", value = "应用名", required = true, dataTypeClass = String.class),
@ApiImplicitParam(name = "streamId", value = "流ID", required = true, dataTypeClass = String.class),
})
@PostMapping(value = "/stop")
@ResponseBody
public Object removeFormGB(@RequestParam(required = true)String app, @RequestParam(required = true)String streamId){
if (streamPushService.stop(app, streamId)){
return "success";
}else {
return "fail";
}
}
}