添加拉流代理与国标关联, 支持代理rtsp/rtmp/...,转发到国标

This commit is contained in:
panlinlin
2021-04-01 18:06:21 +08:00
parent 56859d09df
commit 7dc8fd4a1e
42 changed files with 1394 additions and 224 deletions

View File

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray;
public class StreamInfo {
private String app;
private String streamId;
private String deviceID;
private String channelId;
@@ -19,6 +20,14 @@ public class StreamInfo {
private String rtsp;
private JSONArray tracks;
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getDeviceID() {
return deviceID;
}

View File

@@ -0,0 +1,71 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 直播流关联国标上级平台
*/
public class GbStream extends PlatformGbStream{
private String app;
private String stream;
private String gbId;
private String name;
private double longitude;
private double latitude;
private String streamType;
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public String getGbId() {
return gbId;
}
public void setGbId(String gbId) {
this.gbId = gbId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public String getStreamType() {
return streamType;
}
public void setStreamType(String streamType) {
this.streamType = streamType;
}
}

View File

@@ -0,0 +1,31 @@
package com.genersoft.iot.vmp.gb28181.bean;
public class PlatformGbStream {
private String app;
private String stream;
private String platformId;
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public String getPlatformId() {
return platformId;
}
public void setPlatformId(String platformId) {
this.platformId = platformId;
}
}

View File

@@ -27,6 +27,11 @@ public class SendRtpItem {
*/
private String deviceId;
/**
* 直播流的应用名
*/
private String app;
/**
* 通道id
*/
@@ -40,10 +45,6 @@ public class SendRtpItem {
*/
private int status = 0;
/**
* 设备推流的app
*/
private String app = "rtp";
/**
* 设备推流的streamId

View File

@@ -91,7 +91,9 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private SipSubscribe sipSubscribe;
public SipConfig getSipConfig() {
return sipConfig;
}
/**
* 云台方向放控制,使用配置文件中的默认镜头移动速度

View File

@@ -197,7 +197,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public boolean catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) {
if (channel == null || parentPlatform ==null) {
if ( parentPlatform ==null) {
return false;
}
try {
@@ -210,20 +210,22 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
catalogXml.append("<SumNum>" + size + "</SumNum>\r\n");
catalogXml.append("<DeviceList Num=\"1\">\r\n");
catalogXml.append("<Item>\r\n");
if (channel != null) {
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n");
catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
catalogXml.append("<Model>" + channel.getModel() + "</Model>\r\n");
catalogXml.append("<Owner>" + channel.getOwner() + "</Owner>\r\n");
catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
catalogXml.append("<Parental>" + channel.getParental() + "</Parental>\r\n");// TODO 当前不能添加分组, 所以暂时没有父节点
catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n"); // TODO 当前不能添加分组, 所以暂时没有父节点
catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
catalogXml.append("<Status>" + (channel.getStatus() == 0?"OFF":"ON") + "</Status>\r\n");
catalogXml.append("<Info></Info>\r\n");
}
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n");
catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
catalogXml.append("<Model>" + channel.getModel() + "</Model>\r\n");
catalogXml.append("<Owner>" + channel.getOwner() + "</Owner>\r\n");
catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
catalogXml.append("<Parental>" + channel.getParental() + "</Parental>\r\n");// TODO 当前不能添加分组, 所以暂时没有父节点
catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n"); // TODO 当前不能添加分组, 所以暂时没有父节点
catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
catalogXml.append("<Status>" + (channel.getStatus() == 0?"OFF":"ON") + "</Status>\r\n");
catalogXml.append("<Info></Info>\r\n");
catalogXml.append("</Item>\r\n");
catalogXml.append("</DeviceList>\r\n");

View File

@@ -43,14 +43,23 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
String deviceId = sendRtpItem.getDeviceId();
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
sendRtpItem.setStreamId(streamInfo.getStreamId());
StreamInfo streamInfo = null;
if (deviceId == null) {
streamInfo = new StreamInfo();
streamInfo.setApp(sendRtpItem.getApp());
streamInfo.setStreamId(sendRtpItem.getStreamId());
}else {
streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
sendRtpItem.setStreamId(streamInfo.getStreamId());
streamInfo.setApp("rtp");
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
System.out.println(platformGbId);
System.out.println(channelId);
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app","rtp");
param.put("app",streamInfo.getApp());
param.put("stream",streamInfo.getStreamId());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("dst_url",sendRtpItem.getIp());
@@ -63,7 +72,7 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
while (!rtpPushed) {
try {
if (System.currentTimeMillis() - startTime < 30 * 1000) {
if (zlmrtpServerFactory.isRtpReady(streamInfo.getStreamId())) {
if (zlmrtpServerFactory.isStreamReady(streamInfo.getApp(), streamInfo.getStreamId())) {
rtpPushed = true;
System.out.println("已获取设备推流,开始向上级推流");
zlmrtpServerFactory.startSendRtpStream(param);

View File

@@ -51,12 +51,12 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app","rtp");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
System.out.println("停止向上级推流:" + streamId);
zlmrtpServerFactory.stopSendRtpStream(param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
if (zlmrtpServerFactory.totalReaderCount(sendRtpItem.getApp(), streamId) == 0) {
System.out.println(streamId + "无其它观看者,通知设备停止推流");
cmder.streamByeCmd(streamId);
}

View File

@@ -12,13 +12,12 @@ import javax.sip.message.Request;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
@@ -30,6 +29,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
/**
@@ -93,12 +94,14 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
if (platform != null) {
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
if (channel == null) {
GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
// 不是通道可能是直播流
if (channel != null || gbStream != null ) {
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
}else {
logger.info("通道不存在返回404");
responseAck(evt, Response.NOT_FOUND); // 通道不存在发404资源不存在
return;
}else {
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
}
// 解析sdp消息, 使用jainsip 自带的sdp解析方式
String contentString = new String(request.getRawContent());
@@ -153,67 +156,120 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
String addressStr = sdp.getOrigin().getAddress();
//String sessionName = sdp.getSessionName().getValue();
logger.info("[上级点播]用户:{} 地址:{}:{} ssrc{}", username, addressStr, port, ssrc);
Device device = null;
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
if (channel != null) {
device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
if (device == null) {
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
responseAck(evt, Response.SERVER_INTERNAL_ERROR);
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId,
mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(evt, Response.BUSY_HERE);
return;
}
Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
if (device == null) {
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
responseAck(evt, Response.SERVER_INTERNAL_ERROR);
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId,
mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(evt, Response.BUSY_HERE);
return;
}
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// 通知下级推流,
PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{
// 收到推流, 回复200OK, 等待ack
sendRtpItem.setStatus(1);
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// TODO 添加对tcp的支持
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("t=0 0\r\n");
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
content.append("y="+ ssrc + "\r\n");
content.append("f=\r\n");
// 通知下级推流,
PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{
// 收到推流, 回复200OK, 等待ack
if (sendRtpItem == null) return;
sendRtpItem.setStatus(1);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// TODO 添加对tcp的支持
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("t=0 0\r\n");
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
content.append("y="+ ssrc + "\r\n");
content.append("f=\r\n");
try {
responseAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
try {
responseAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
} ,(event -> {
// 未知错误。直接转发设备点播的错误
Response response = null;
try {
response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
getServerTransaction(evt).sendResponse(response);
} catch (ParseException | SipException | InvalidArgumentException e) {
e.printStackTrace();
}
}));
if (logger.isDebugEnabled()) {
logger.debug(playResult.getResult().toString());
}
},(event -> {
// 未知错误。直接转发设备点播的错误
Response response = null;
try {
response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
getServerTransaction(evt).sendResponse(response);
} catch (ParseException | SipException | InvalidArgumentException e) {
e.printStackTrace();
}else if (gbStream != null) {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId,
mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(evt, Response.BUSY_HERE);
return;
}
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// 检测直播流是否在线
Boolean streamReady = zlmrtpServerFactory.isStreamReady(gbStream.getApp(), gbStream.getStream());
if (streamReady) {
sendRtpItem.setStatus(1);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// TODO 添加对tcp的支持
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("t=0 0\r\n");
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
content.append("y="+ ssrc + "\r\n");
content.append("f=\r\n");
try {
responseAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
}));
if (logger.isDebugEnabled()) {
logger.debug(playResult.getResult().toString());
}
} else {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = storager.queryVideoDevice(requesterId);
@@ -298,6 +354,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
}
}
/***
* 回复状态码
* 100 trying

View File

@@ -529,13 +529,44 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
String sn = snElement.getText();
// 准备回复通道信息
List<ChannelReduce> channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getServerGBId());
// 查询关联的直播通道
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
int size = channelReduces.size() + gbStreams.size();
// 回复级联的通道
if (channelReduces.size() > 0) {
for (ChannelReduce channelReduce : channelReduces) {
DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), channelReduces.size());
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
}
}
// 回复直播的通道
if (gbStreams.size() > 0) {
for (GbStream gbStream : gbStreams) {
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setChannelId(gbStream.getGbId());
deviceChannel.setName(gbStream.getName());
deviceChannel.setLongitude(gbStream.getLongitude());
deviceChannel.setLatitude(gbStream.getLatitude());
deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
deviceChannel.setStatus(1);
// deviceChannel.setParentId(parentPlatform.getDeviceGBId());
deviceChannel.setRegisterWay(1);
deviceChannel.setCivilCode(cmder.getSipConfig().getSipDomain());
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
// deviceChannel.setAddress("test");
deviceChannel.setParental(0);
deviceChannel.setSecrecy("0");
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
}
}
if (size == 0) {
// 回复无通道
cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size);
}
}

View File

@@ -96,7 +96,7 @@ public class ZLMRTPServerFactory {
}
/**
* 创建一个推流
* 创建一个国标推流
* @param ip 推流ip
* @param port 推流端口
* @param ssrc 推流唯一标识
@@ -122,6 +122,39 @@ public class ZLMRTPServerFactory {
sendRtpItem.setDeviceId(deviceId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setApp("rtp");
sendRtpItem.setLocalPort(localPort);
return sendRtpItem;
}
/**
* 创建一个直播推流
* @param ip 推流ip
* @param port 推流端口
* @param ssrc 推流唯一标识
* @param platformId 平台id
* @param channelId 通道id
* @param tcp 是否为tcp
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
String playSsrc = SsrcUtil.getPlaySsrc();
int localPort = createRTPServer(SsrcUtil.getPlaySsrc());
if (localPort != -1) {
closeRTPServer(playSsrc);
}else {
logger.error("没有可用的端口");
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setApp(app);
sendRtpItem.setStreamId(stream);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setLocalPort(localPort);
return sendRtpItem;
}
@@ -152,13 +185,21 @@ public class ZLMRTPServerFactory {
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
}
/**
* 查询待转推的流是否就绪
*/
public Boolean isStreamReady(String app, String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(app, "rtmp", streamId);
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
}
/**
* 查询转推的流是否有其它观看者
* @param streamId
* @return
*/
public int totalReaderCount(String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
public int totalReaderCount(String app, String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(app, "rtmp", streamId);
return mediaInfo.getInteger("totalReaderCount");
}

View File

@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
//import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
@@ -165,8 +165,8 @@ public class ZLMRunner implements CommandLineRunner {
// 更新流列表
zlmMediaListManager.updateMediaList();
// 恢复流代理
List<StreamProxyDto> streamProxyListForEnable = storager.getStreamProxyListForEnable(true);
for (StreamProxyDto streamProxyDto : streamProxyListForEnable) {
List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnable(true);
for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
streamProxyService.addStreamProxyToZlm(streamProxyDto);
}

View File

@@ -1,6 +1,9 @@
package com.genersoft.iot.vmp.media.zlm.dto;
public class StreamProxyDto {
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
public class StreamProxyItem extends GbStream {
private String type;
private String app;
private String stream;
@@ -109,4 +112,6 @@ public class StreamProxyDto {
public void setEnable_mp4(boolean enable_mp4) {
this.enable_mp4 = enable_mp4;
}
}

View File

@@ -2,12 +2,9 @@ package com.genersoft.iot.vmp.storager;
import java.util.List;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.github.pagehelper.PageInfo;
/**
@@ -238,7 +235,7 @@ public interface IVideoManagerStorager {
/**
* 添加Mobile Position设备移动位置
* @param MobilePosition
* @param mobilePosition
* @return
*/
public boolean insertMobilePosition(MobilePosition mobilePosition);
@@ -268,14 +265,14 @@ public interface IVideoManagerStorager {
* @param streamProxyDto
* @return
*/
public int addStreamProxy(StreamProxyDto streamProxyDto);
public boolean addStreamProxy(StreamProxyItem streamProxyDto);
/**
* 更新代理流
* @param streamProxyDto
* @return
*/
public int updateStreamProxy(StreamProxyDto streamProxyDto);
public boolean updateStreamProxy(StreamProxyItem streamProxyDto);
/**
* 移除代理流
@@ -290,7 +287,7 @@ public interface IVideoManagerStorager {
* @param enable
* @return
*/
public List<StreamProxyDto> getStreamProxyListForEnable(boolean enable);
public List<StreamProxyItem> getStreamProxyListForEnable(boolean enable);
/**
* 按照是app和stream获取代理流
@@ -298,7 +295,7 @@ public interface IVideoManagerStorager {
* @param stream
* @return
*/
public StreamProxyDto queryStreamProxy(String app, String stream);
public StreamProxyItem queryStreamProxy(String app, String stream);
/**
* 获取代理流
@@ -306,5 +303,20 @@ public interface IVideoManagerStorager {
* @param count
* @return
*/
PageInfo<StreamProxyDto> queryStreamProxyList(Integer page, Integer count);
PageInfo<StreamProxyItem> queryStreamProxyList(Integer page, Integer count);
/**
* 根据国标ID获取平台关联的直播流
* @param platformId
* @param channelId
* @return
*/
GbStream queryStreamInParentPlatform(String platformId, String channelId);
/**
* 获取平台关联的直播流
* @param platformId
* @return
*/
List<GbStream> queryGbStreamListInPlatform(String platformId);
}

View File

@@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface GbStreamMapper {
@Insert("INSERT INTO gb_stream (app, stream, gbId, name, " +
"longitude, latitude, streamType) VALUES" +
"('${app}', '${stream}', '${gbId}', '${name}', " +
"'${longitude}', '${latitude}', '${streamType}')")
int add(GbStream gbStream);
@Update("UPDATE gb_stream " +
"SET app=#{app}," +
"stream=#{stream}," +
"gbId=#{gbId}," +
"name=#{name}," +
"streamType=#{streamType}," +
"longitude=#{longitude}, " +
"latitude=#{latitude}, " +
"WHERE app=#{app} AND stream=#{stream} AND gbId=#{gbId}")
int update(GbStream gbStream);
@Delete("DELETE FROM gb_stream WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream")
List<GbStream> selectAll();
@Select("SELECT * FROM gb_stream WHERE app=#{app} AND stream=#{stream}")
StreamProxyItem selectOne(String app, String stream);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
GbStream queryStreamInPlatform(String platformId, String gbId);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"WHERE pgs.platformId = '${platformId}'")
List<GbStream> queryGbStreamListInPlatform(String platformId);
}

View File

@@ -0,0 +1,28 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface PlarfotmGbStreamMapper {
@Insert("INSERT INTO platform_gb_stream (app, stream, platformId) VALUES" +
"('${app}', '${stream}', '${platformId}')")
int add(PlatformGbStream platformGbStream);
@Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}")
int delByAppAndStream(String app, String stream);
@Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}")
int delByPlatformId(String platformId);
@Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{platformId}")
StreamProxyItem selectOne(String app, String stream, String platformId);
}

View File

@@ -13,7 +13,7 @@ import java.util.List;
@Mapper
@Repository
public interface PatformChannelMapper {
public interface PlatformChannelMapper {
/**
* 查询列表里已经关联的

View File

@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
@@ -14,7 +14,7 @@ public interface StreamProxyMapper {
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable) VALUES" +
"('${type}','${app}', '${stream}', '${url}', '${src_url}', '${dst_url}', " +
"'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable} )")
int add(StreamProxyDto streamProxyDto);
int add(StreamProxyItem streamProxyDto);
@Update("UPDATE stream_proxy " +
"SET type=#{type}, " +
@@ -30,17 +30,17 @@ public interface StreamProxyMapper {
"enable=#{enable}, " +
"enable_mp4=#{enable_mp4} " +
"WHERE app=#{app} AND stream=#{stream}")
int update(StreamProxyDto streamProxyDto);
int update(StreamProxyItem streamProxyDto);
@Delete("DELETE FROM stream_proxy WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@Select("SELECT * FROM stream_proxy")
List<StreamProxyDto> selectAll();
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream")
List<StreamProxyItem> selectAll();
@Select("SELECT * FROM stream_proxy WHERE enable=${enable}")
List<StreamProxyDto> selectForEnable(boolean enable);
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=${enable}")
List<StreamProxyItem> selectForEnable(boolean enable);
@Select("SELECT * FROM stream_proxy WHERE app=#{app} AND stream=#{stream}")
StreamProxyDto selectOne(String app, String stream);
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream}")
StreamProxyItem selectOne(String app, String stream);
}

View File

@@ -2,21 +2,20 @@ package com.genersoft.iot.vmp.storager.impl;
import java.util.*;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
/**
@@ -27,6 +26,11 @@ import org.springframework.transaction.annotation.Transactional;
@SuppressWarnings("rawtypes")
@Component
public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
@Autowired
private DeviceMapper deviceMapper;
@@ -44,12 +48,13 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
private IRedisCatchStorage redisCatchStorage;
@Autowired
private PatformChannelMapper patformChannelMapper;
private PlatformChannelMapper platformChannelMapper;
@Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
private GbStreamMapper gbStreamMapper;
/**
@@ -283,7 +288,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
public boolean deleteParentPlatform(ParentPlatform parentPlatform) {
int result = platformMapper.delParentPlatform(parentPlatform);
// 删除关联的通道
patformChannelMapper.cleanChannelForGB(parentPlatform.getServerGBId());
platformChannelMapper.cleanChannelForGB(parentPlatform.getServerGBId());
return result > 0;
}
@@ -333,7 +338,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
}
List<String> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
// 查询当前已经存在的
List<String> relatedPlatformchannels = patformChannelMapper.findChannelRelatedPlatform(platformId, deviceAndChannelList);
List<String> relatedPlatformchannels = platformChannelMapper.findChannelRelatedPlatform(platformId, deviceAndChannelList);
if (relatedPlatformchannels != null) {
deviceAndChannelList.removeAll(relatedPlatformchannels);
}
@@ -344,7 +349,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
// 对剩下的数据进行存储
int result = 0;
if (channelReducesToAdd.size() > 0) {
result = patformChannelMapper.addChannels(platformId, channelReducesToAdd);
result = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
}
return result;
@@ -354,20 +359,20 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Override
public int delChannelForGB(String platformId, List<ChannelReduce> channelReduces) {
int result = patformChannelMapper.delChannelForGB(platformId, channelReduces);
int result = platformChannelMapper.delChannelForGB(platformId, channelReduces);
return result;
}
@Override
public DeviceChannel queryChannelInParentPlatform(String platformId, String channelId) {
DeviceChannel channel = patformChannelMapper.queryChannelInParentPlatform(platformId, channelId);
DeviceChannel channel = platformChannelMapper.queryChannelInParentPlatform(platformId, channelId);
return channel;
}
@Override
public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) {
Device device = patformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
Device device = platformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
return device;
}
@@ -390,27 +395,54 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
/**
* 新增代理流
* @param streamProxyDto
* @param streamProxyItem
* @return
*/
@Override
public int addStreamProxy(StreamProxyDto streamProxyDto) {
return streamProxyMapper.add(streamProxyDto);
public boolean addStreamProxy(StreamProxyItem streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
try {
if (gbStreamMapper.add(streamProxyItem)<0 || streamProxyMapper.add(streamProxyItem) < 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
}
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
}catch (Exception e) {
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
/**
* 更新代理流
* @param streamProxyDto
* @param streamProxyItem
* @return
*/
@Override
public int updateStreamProxy(StreamProxyDto streamProxyDto) {
return streamProxyMapper.update(streamProxyDto);
public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
try {
if (gbStreamMapper.update(streamProxyItem)<0 || streamProxyMapper.update(streamProxyItem) < 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
/**
* 移除代理流
* @param id
* @param app
* @param stream
* @return
*/
@Override
@@ -424,7 +456,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
* @return
*/
@Override
public List<StreamProxyDto> getStreamProxyListForEnable(boolean enable) {
public List<StreamProxyItem> getStreamProxyListForEnable(boolean enable) {
return streamProxyMapper.selectForEnable(enable);
}
@@ -435,12 +467,32 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
* @return
*/
@Override
public PageInfo<StreamProxyDto> queryStreamProxyList(Integer page, Integer count) {
public PageInfo<StreamProxyItem> queryStreamProxyList(Integer page, Integer count) {
PageHelper.startPage(page, count);
List<StreamProxyDto> all = streamProxyMapper.selectAll();
List<StreamProxyItem> all = streamProxyMapper.selectAll();
return new PageInfo<>(all);
}
/**
* 根据国标ID获取平台关联的直播流
* @param platformId
* @param gbId
* @return
*/
@Override
public GbStream queryStreamInParentPlatform(String platformId, String gbId) {
return gbStreamMapper.queryStreamInPlatform(platformId, gbId);
}
/**
* 获取平台关联的直播流
* @param platformId
* @return
*/
@Override
public List<GbStream> queryGbStreamListInPlatform(String platformId) {
return gbStreamMapper.queryGbStreamListInPlatform(platformId);
}
/**
* 按照是app和stream获取代理流
@@ -449,7 +501,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
* @return
*/
@Override
public StreamProxyDto queryStreamProxy(String app, String stream){
public StreamProxyItem queryStreamProxy(String app, String stream){
return streamProxyMapper.selectOne(app, stream);
}
}

View File

@@ -0,0 +1,65 @@
package com.genersoft.iot.vmp.vmanager.gbStream;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.gbStream.bean.GbStreamParam;
import com.genersoft.iot.vmp.vmanager.platform.bean.UpdateChannelParam;
import com.genersoft.iot.vmp.vmanager.service.IGbStreamService;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@CrossOrigin
@RestController
@RequestMapping("/api/gbStream")
public class GbStreamController {
private final static Logger logger = LoggerFactory.getLogger(GbStreamController.class);
@Autowired
private IGbStreamService gbStreamService;
@Autowired
private IVideoManagerStorager storager;
@RequestMapping(value = "/list")
@ResponseBody
public PageInfo<GbStream> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count){
return gbStreamService.getAll(page, count);
}
@RequestMapping(value = "/del")
@ResponseBody
public Object del(@RequestBody GbStreamParam gbStreamParam){
System.out.println(2222);
System.out.println(gbStreamParam.getGbStreams().size());
if (gbStreamService.delPlatformInfo(gbStreamParam.getGbStreams())) {
return "success";
}else {
return "fail";
}
}
@RequestMapping(value = "/add")
@ResponseBody
public Object add(@RequestBody GbStreamParam gbStreamParam){
System.out.println(3333);
System.out.println(gbStreamParam.getGbStreams().size());
if (gbStreamService.addPlatformInfo(gbStreamParam.getGbStreams(), gbStreamParam.getPlatformId())) {
return "success";
}else {
return "fail";
}
}
}

View File

@@ -0,0 +1,28 @@
package com.genersoft.iot.vmp.vmanager.gbStream.bean;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import java.util.List;
public class GbStreamParam {
private String platformId;
private List<GbStream> gbStreams;
public String getPlatformId() {
return platformId;
}
public void setPlatformId(String platformId) {
this.platformId = platformId;
}
public List<GbStream> getGbStreams() {
return gbStreams;
}
public void setGbStreams(List<GbStream> gbStreams) {
this.gbStreams = gbStreams;
}
}

View File

@@ -46,7 +46,4 @@ public class MediaController {
return mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream);
}
}

View File

@@ -0,0 +1,35 @@
package com.genersoft.iot.vmp.vmanager.platformGbStream;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.service.IGbStreamService;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@CrossOrigin
@RestController
@RequestMapping("/api")
public class PlatformGbStreamController {
private final static Logger logger = LoggerFactory.getLogger(PlatformGbStreamController.class);
@Autowired
private IGbStreamService gbStreamService;
@Autowired
private IVideoManagerStorager storager;
@RequestMapping(value = "/list")
@ResponseBody
public PageInfo<GbStream> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count){
return gbStreamService.getAll(page, count);
}
}

View File

@@ -163,20 +163,7 @@ public class PlayController {
JSONObject data = jsonObject.getJSONObject("data");
if (data != null) {
result.put("key", data.getString("key"));
// StreamInfo streamInfoResult = new StreamInfo();
// streamInfoResult.setRtmp(dstUrl);
// streamInfoResult.setRtsp(String.format("rtsp://%s:%s/convert/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), streamId));
// streamInfoResult.setStreamId(streamId);
// streamInfoResult.setFlv(String.format("http://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoResult.setWs_flv(String.format("ws://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoResult.setHls(String.format("http://%s:%s/convert/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoResult.setWs_hls(String.format("ws://%s:%s/convert/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoResult.setFmp4(String.format("http://%s:%s/convert/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoResult.setWs_fmp4(String.format("ws://%s:%s/convert/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoResult.setTs(String.format("http://%s:%s/convert/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
// streamInfoResult.setWs_ts(String.format("ws://%s:%s/convert/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId));
StreamInfo streamInfoResult = mediaService.getStreamInfoByAppAndStream("convert", streamId);
streamInfoResult.setStreamId(streamId);
result.put("data", streamInfoResult);
}
}else {

View File

@@ -0,0 +1,41 @@
package com.genersoft.iot.vmp.vmanager.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream;
import com.github.pagehelper.PageInfo;
import java.util.List;
/**
* 级联国标平台关联流业务接口
*/
public interface IGbStreamService {
/**
* 分页获取所有
* @param page
* @param count
* @return
*/
PageInfo<GbStream> getAll(Integer page, Integer count);
/**
* 移除
* @param app
* @param stream
*/
void del(String app, String stream);
/**
* 保存国标关联
* @param gbStreams
*/
boolean addPlatformInfo(List<GbStream> gbStreams, String platformId);
/**
* 移除国标关联
* @param gbStreams
*/
boolean delPlatformInfo(List<GbStream> gbStreams);
}

View File

@@ -1,8 +1,7 @@
package com.genersoft.iot.vmp.vmanager.service;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.vmanager.streamProxy.StreamProxyController;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.github.pagehelper.PageInfo;
public interface IStreamProxyService {
@@ -11,21 +10,21 @@ public interface IStreamProxyService {
* 保存视频代理
* @param param
*/
void save(StreamProxyDto param);
void save(StreamProxyItem param);
/**
* 添加视频代理到zlm
* @param param
* @return
*/
JSONObject addStreamProxyToZlm(StreamProxyDto param);
JSONObject addStreamProxyToZlm(StreamProxyItem param);
/**
* 从zlm移除视频代理
* @param param
* @return
*/
JSONObject removeStreamProxyFromZlm(StreamProxyDto param);
JSONObject removeStreamProxyFromZlm(StreamProxyItem param);
/**
* 分页查询
@@ -33,7 +32,7 @@ public interface IStreamProxyService {
* @param count
* @return
*/
PageInfo<StreamProxyDto> getAll(Integer page, Integer count);
PageInfo<StreamProxyItem> getAll(Integer page, Integer count);
/**
* 删除视频代理

View File

@@ -0,0 +1,89 @@
package com.genersoft.iot.vmp.vmanager.service.impl;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlarfotmGbStreamMapper;
import com.genersoft.iot.vmp.vmanager.platform.PlatformController;
import com.genersoft.iot.vmp.vmanager.service.IGbStreamService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import java.util.List;
@Service
public class GbStreamServiceImpl implements IGbStreamService {
private final static Logger logger = LoggerFactory.getLogger(GbStreamServiceImpl.class);
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlarfotmGbStreamMapper plarfotmGbStreamMapper;
@Override
public PageInfo<GbStream> getAll(Integer page, Integer count) {
PageHelper.startPage(page, count);
List<GbStream> all = gbStreamMapper.selectAll();
return new PageInfo<>(all);
}
@Override
public void del(String app, String stream) {
gbStreamMapper.del(app, stream);
}
@Override
public boolean addPlatformInfo(List<GbStream> gbStreams, String platformId) {
// 放在事务内执行
boolean result = false;
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
for (GbStream gbStream : gbStreams) {
gbStream.setPlatformId(platformId);
plarfotmGbStreamMapper.add(gbStream);
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {
logger.error("批量保存流与平台的关系时错误", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
@Override
public boolean delPlatformInfo(List<GbStream> gbStreams) {
// 放在事务内执行
boolean result = false;
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
for (GbStream gbStream : gbStreams) {
plarfotmGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream());
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {
logger.error("批量移除流与平台的关系时错误", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
}

View File

@@ -27,6 +27,8 @@ public class MediaServiceImpl implements IMediaService {
public StreamInfo getStreamInfoByAppAndStream(String app, String stream) {
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
StreamInfo streamInfoResult = new StreamInfo();
streamInfoResult.setStreamId(stream);
streamInfoResult.setApp(app);
streamInfoResult.setRtmp(String.format("rtmp://%s:%s/%s/%s", mediaInfo.getWanIp(), mediaInfo.getRtmpPort(), app, stream));
streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), app, stream));
streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), app, stream));

View File

@@ -153,27 +153,8 @@ public class PlayServiceImpl implements IPlayService {
public StreamInfo onPublishHandler(JSONObject resonse, String deviceId, String channelId, String uuid) {
String streamId = resonse.getString("id");
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream("rtp", streamId);
// StreamInfo streamInfo = new StreamInfo();
streamInfo.setStreamId(streamId);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);
// MediaServerConfig mediaServerConfig = redisCatchStorage.getMediaInfo();
// streamInfo.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
// streamInfo.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
//
// streamInfo.setFmp4(String.format("http://%s:%s/rtp/%s.live.mp4", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
// streamInfo.setWs_fmp4(String.format("ws://%s:%s/rtp/%s.live.mp4", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
//
// streamInfo.setHls(String.format("http://%s:%s/rtp/%s/hls.m3u8", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
// streamInfo.setWs_hls(String.format("ws://%s:%s/rtp/%s/hls.m3u8", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
//
// streamInfo.setTs(String.format("http://%s:%s/rtp/%s.live.ts", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
// streamInfo.setWs_ts(String.format("ws://%s:%s/rtp/%s.live.ts", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId));
//
// streamInfo.setRtmp(String.format("rtmp://%s:%s/rtp/%s", mediaServerConfig.getWanIp(), mediaServerConfig.getRtmpPort(), streamId));
// streamInfo.setRtsp(String.format("rtsp://%s:%s/rtp/%s", mediaServerConfig.getWanIp(), mediaServerConfig.getRtspPort(), streamId));
return streamInfo;
}

View File

@@ -2,21 +2,18 @@ package com.genersoft.iot.vmp.vmanager.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlarfotmGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.vmanager.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.streamProxy.StreamProxyController;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 视频代理业务
*/
@@ -35,29 +32,35 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlarfotmGbStreamMapper plarfotmGbStreamMapper;
@Override
public void save(StreamProxyDto param) {
public void save(StreamProxyItem param) {
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
param.getStream() );
param.setDst_url(dstUrl);
// 更新
if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
int result = videoManagerStorager.updateStreamProxy(param);
if (result > 0 && param.isEnable()) {
boolean result = videoManagerStorager.updateStreamProxy(param);
if (result && param.isEnable()) {
addStreamProxyToZlm(param);
}
}else { // 新增
int result = videoManagerStorager.addStreamProxy(param);
if (result > 0 && param.isEnable()) {
boolean result = videoManagerStorager.addStreamProxy(param);
if (result && param.isEnable()) {
addStreamProxyToZlm(param);
}
}
}
@Override
public JSONObject addStreamProxyToZlm(StreamProxyDto param) {
public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
JSONObject result = null;
if ("default".equals(param.getType())){
result = zlmresTfulUtils.addStreamProxy(param.getApp(), param.getStream(), param.getUrl(),
@@ -70,37 +73,42 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
public JSONObject removeStreamProxyFromZlm(StreamProxyDto param) {
public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
JSONObject result = zlmresTfulUtils.closeStreams(param.getApp(), param.getStream());
return result;
}
@Override
public PageInfo<StreamProxyDto> getAll(Integer page, Integer count) {
public PageInfo<StreamProxyItem> getAll(Integer page, Integer count) {
return videoManagerStorager.queryStreamProxyList(page, count);
}
@Override
public void del(String app, String stream) {
StreamProxyDto streamProxyDto = new StreamProxyDto();
streamProxyDto.setApp(app);
streamProxyDto.setStream(stream);
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
StreamProxyItem streamProxyItem = new StreamProxyItem();
streamProxyItem.setApp(app);
streamProxyItem.setStream(stream);
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
if (jsonObject.getInteger("code") == 0) {
videoManagerStorager.deleteStreamProxy(app, stream);
// 如果关联了国标那么移除关联
gbStreamMapper.del(app, stream);
plarfotmGbStreamMapper.delByAppAndStream(app, stream);
// TODO 如果关联的推流, 那么状态设置为离线
}
}
@Override
public boolean start(String app, String stream) {
boolean result = false;
StreamProxyDto streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
if (!streamProxyDto.isEnable() && streamProxyDto != null) {
JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
if (!streamProxy.isEnable() && streamProxy != null) {
JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
if (jsonObject.getInteger("code") == 0) {
result = true;
streamProxyDto.setEnable(true);
videoManagerStorager.updateStreamProxy(streamProxyDto);
streamProxy.setEnable(true);
videoManagerStorager.updateStreamProxy(streamProxy);
}
}
return result;
@@ -109,7 +117,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public boolean stop(String app, String stream) {
boolean result = false;
StreamProxyDto streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
if (streamProxyDto.isEnable() && streamProxyDto != null) {
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
if (jsonObject.getInteger("code") == 0) {

View File

@@ -1,8 +1,7 @@
package com.genersoft.iot.vmp.vmanager.streamProxy;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.service.IStreamProxyService;
import com.github.pagehelper.PageInfo;
@@ -31,17 +30,17 @@ public class StreamProxyController {
@RequestMapping(value = "/list")
@ResponseBody
public PageInfo<StreamProxyDto> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String q,
@RequestParam(required = false)Boolean online ){
public PageInfo<StreamProxyItem> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String q,
@RequestParam(required = false)Boolean online ){
return streamProxyService.getAll(page, count);
}
@RequestMapping(value = "/save")
@ResponseBody
public Object save(@RequestBody StreamProxyDto param){
public Object save(@RequestBody StreamProxyItem param){
logger.info("添加代理: " + JSONObject.toJSONString(param));
streamProxyService.save(param);
return "success";