Merge branch 'wvp-28181-2.0' into main-dev
# Conflicts: # src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java # src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java # src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java # src/main/java/com/genersoft/iot/vmp/service/IPlayService.java # src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java # src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java # src/main/resources/all-application.yml # web_src/config/index.js # web_src/src/components/dialog/devicePlayer.vue
This commit is contained in:
@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.service;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
|
||||
|
||||
import java.util.List;
|
||||
@@ -38,7 +38,7 @@ public interface IDeviceChannelService {
|
||||
* 获取统计信息
|
||||
* @return
|
||||
*/
|
||||
ResourceBaceInfo getOverview();
|
||||
ResourceBaseInfo getOverview();
|
||||
|
||||
/**
|
||||
* 查询所有未分配的通道
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.BaseTree;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -162,7 +162,7 @@ public interface IDeviceService {
|
||||
* 获取统计信息
|
||||
* @return
|
||||
*/
|
||||
ResourceBaceInfo getOverview();
|
||||
ResourceBaseInfo getOverview();
|
||||
|
||||
/**
|
||||
* 获取所有设备
|
||||
|
||||
@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.service;
|
||||
|
||||
import com.genersoft.iot.vmp.common.InviteInfo;
|
||||
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||
import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||
|
||||
/**
|
||||
* 记录国标点播的状态,包括实时预览,下载,录像回放
|
||||
@@ -14,6 +14,8 @@ public interface IInviteStreamService {
|
||||
*/
|
||||
void updateInviteInfo(InviteInfo inviteInfo);
|
||||
|
||||
InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream);
|
||||
|
||||
/**
|
||||
* 获取点播的状态信息
|
||||
*/
|
||||
@@ -54,7 +56,7 @@ public interface IInviteStreamService {
|
||||
/**
|
||||
* 添加一个invite回调
|
||||
*/
|
||||
void once(InviteSessionType type, String deviceId, String channelId, String stream, InviteErrorCallback<Object> callback);
|
||||
void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback);
|
||||
|
||||
/**
|
||||
* 调用一个invite回调
|
||||
@@ -65,4 +67,12 @@ public interface IInviteStreamService {
|
||||
* 清空一个设备的所有invite信息
|
||||
*/
|
||||
void clearInviteInfo(String deviceId);
|
||||
|
||||
/**
|
||||
* 统计同一个zlm下的国标收流个数
|
||||
*/
|
||||
int getStreamInfoCount(String mediaServerId);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
|
||||
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
|
||||
@@ -25,8 +25,8 @@ import java.util.Map;
|
||||
public interface IPlayService {
|
||||
|
||||
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
|
||||
InviteErrorCallback<Object> callback);
|
||||
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, InviteErrorCallback<Object> callback);
|
||||
ErrorCallback<Object> callback);
|
||||
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback);
|
||||
|
||||
StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId);
|
||||
|
||||
@@ -37,13 +37,13 @@ public interface IPlayService {
|
||||
*/
|
||||
MediaServerItem getNewMediaServerItemHasAssist(Device device);
|
||||
|
||||
void playBack(String deviceId, String channelId, String startTime, String endTime, InviteErrorCallback<Object> callback);
|
||||
void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, InviteErrorCallback<Object> callback);
|
||||
void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
|
||||
void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
|
||||
|
||||
void zlmServerOffline(String mediaServerId);
|
||||
|
||||
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback);
|
||||
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback);
|
||||
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
|
||||
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
|
||||
|
||||
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);
|
||||
|
||||
@@ -69,4 +69,6 @@ public interface IPlayService {
|
||||
void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event);
|
||||
|
||||
void stopTalk(Device device, String channelId, Boolean streamIsReady);
|
||||
|
||||
void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback);
|
||||
}
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
package com.genersoft.iot.vmp.service;
|
||||
|
||||
import com.genersoft.iot.vmp.storager.dao.dto.RecordInfo;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
|
||||
public interface IRecordInfoServer {
|
||||
PageInfo<RecordInfo> getRecordList(int page, int count);
|
||||
}
|
||||
@@ -1,10 +1,11 @@
|
||||
package com.genersoft.iot.vmp.service;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
|
||||
public interface IStreamProxyService {
|
||||
@@ -13,7 +14,7 @@ public interface IStreamProxyService {
|
||||
* 保存视频代理
|
||||
* @param param
|
||||
*/
|
||||
StreamInfo save(StreamProxyItem param);
|
||||
void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback);
|
||||
|
||||
/**
|
||||
* 添加视频代理到zlm
|
||||
@@ -108,6 +109,6 @@ public interface IStreamProxyService {
|
||||
* 获取统计信息
|
||||
* @return
|
||||
*/
|
||||
ResourceBaceInfo getOverview();
|
||||
ResourceBaseInfo getOverview();
|
||||
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
||||
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
|
||||
import java.util.List;
|
||||
@@ -113,5 +113,5 @@ public interface IStreamPushService {
|
||||
* 获取统计信息
|
||||
* @return
|
||||
*/
|
||||
ResourceBaceInfo getOverview();
|
||||
ResourceBaseInfo getOverview();
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.genersoft.iot.vmp.service.bean;
|
||||
|
||||
public interface InviteErrorCallback<T> {
|
||||
public interface ErrorCallback<T> {
|
||||
|
||||
void run(int code, String msg, T data);
|
||||
}
|
||||
@@ -5,6 +5,7 @@ package com.genersoft.iot.vmp.service.bean;
|
||||
*/
|
||||
public enum InviteErrorCode {
|
||||
SUCCESS(0, "成功"),
|
||||
FAIL(-100, "失败"),
|
||||
ERROR_FOR_SIGNALLING_TIMEOUT(-1, "信令超时"),
|
||||
ERROR_FOR_STREAM_TIMEOUT(-2, "收流超时"),
|
||||
ERROR_FOR_RESOURCE_EXHAUSTION(-3, "资源耗尽"),
|
||||
|
||||
@@ -11,7 +11,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
|
||||
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -50,8 +50,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
||||
device = deviceMapper.getDeviceByDeviceId(deviceChannel.getDeviceId());
|
||||
}
|
||||
|
||||
|
||||
|
||||
if ("WGS84".equals(device.getGeoCoordSys())) {
|
||||
deviceChannel.setLongitudeWgs84(deviceChannel.getLongitude());
|
||||
deviceChannel.setLatitudeWgs84(deviceChannel.getLatitude());
|
||||
@@ -175,8 +173,12 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceBaceInfo getOverview() {
|
||||
return channelMapper.getOverview();
|
||||
public ResourceBaseInfo getOverview() {
|
||||
|
||||
int online = channelMapper.getOnlineCount();
|
||||
int total = channelMapper.getAllChannelCount();
|
||||
|
||||
return new ResourceBaseInfo(total, online);
|
||||
}
|
||||
|
||||
|
||||
@@ -258,4 +260,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
||||
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
||||
@@ -10,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
|
||||
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
|
||||
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
@@ -23,7 +26,7 @@ import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
|
||||
import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.BaseTree;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -48,6 +51,8 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private SIPCommander cmder;
|
||||
@Autowired
|
||||
private DynamicTask dynamicTask;
|
||||
|
||||
@@ -124,9 +129,10 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
}
|
||||
|
||||
// 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询
|
||||
if (device.getCreateTime() == null) {
|
||||
device.setOnline(1);
|
||||
if (deviceInDb == null) {
|
||||
device.setOnLine(true);
|
||||
device.setCreateTime(now);
|
||||
device.setUpdateTime(now);
|
||||
logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
|
||||
deviceMapper.add(device);
|
||||
redisCatchStorage.updateDevice(device);
|
||||
@@ -137,8 +143,12 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
}
|
||||
sync(device);
|
||||
}else {
|
||||
if(device.getOnline() == 0){
|
||||
device.setOnline(1);
|
||||
|
||||
if (deviceInDb != null) {
|
||||
device.setSwitchPrimarySubStream(deviceInDb.isSwitchPrimarySubStream());
|
||||
}
|
||||
if(!device.isOnLine()){
|
||||
device.setOnLine(true);
|
||||
device.setCreateTime(now);
|
||||
deviceMapper.update(device);
|
||||
redisCatchStorage.updateDevice(device);
|
||||
@@ -185,14 +195,14 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
|
||||
@Override
|
||||
public void offline(String deviceId, String reason) {
|
||||
logger.error("[设备离线],{}, device:{}", reason, deviceId);
|
||||
logger.warn("[设备离线],{}, device:{}", reason, deviceId);
|
||||
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
|
||||
if (device == null) {
|
||||
return;
|
||||
}
|
||||
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + deviceId;
|
||||
dynamicTask.stop(registerExpireTaskKey);
|
||||
device.setOnline(0);
|
||||
device.setOnLine(false);
|
||||
redisCatchStorage.updateDevice(device);
|
||||
deviceMapper.update(device);
|
||||
//进行通道离线
|
||||
@@ -256,7 +266,7 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
}
|
||||
logger.info("[移除目录订阅]: {}", device.getDeviceId());
|
||||
String taskKey = device.getDeviceId() + "catalog";
|
||||
if (device.getOnline() == 1) {
|
||||
if (device.isOnLine()) {
|
||||
Runnable runnable = dynamicTask.get(taskKey);
|
||||
if (runnable instanceof ISubscribeTask) {
|
||||
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
|
||||
@@ -289,7 +299,7 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
}
|
||||
logger.info("[移除移动位置订阅]: {}", device.getDeviceId());
|
||||
String taskKey = device.getDeviceId() + "mobile_position";
|
||||
if (device.getOnline() == 1) {
|
||||
if (device.isOnLine()) {
|
||||
Runnable runnable = dynamicTask.get(taskKey);
|
||||
if (runnable instanceof ISubscribeTask) {
|
||||
ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
|
||||
@@ -356,7 +366,7 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
|
||||
@Override
|
||||
public void checkDeviceStatus(Device device) {
|
||||
if (device == null || device.getOnline() == 0) {
|
||||
if (device == null || !device.isOnLine()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
@@ -405,63 +415,11 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
if (device == null) {
|
||||
return null;
|
||||
}
|
||||
if (parentId == null || parentId.equals(deviceId)) {
|
||||
// 字根节点开始查询
|
||||
List<DeviceChannel> rootNodes = getRootNodes(deviceId, TreeType.CIVIL_CODE.equals(device.getTreeType()), true, !onlyCatalog);
|
||||
return transportChannelsToTree(rootNodes, "");
|
||||
if (ObjectUtils.isEmpty(parentId) || parentId.equals(deviceId)) {
|
||||
parentId = null;
|
||||
}
|
||||
|
||||
if (TreeType.CIVIL_CODE.equals(device.getTreeType())) {
|
||||
if (parentId.length()%2 != 0) {
|
||||
return null;
|
||||
}
|
||||
// 使用行政区划展示树
|
||||
// if (parentId.length() > 10) {
|
||||
// // TODO 可能是行政区划与业务分组混杂的情形
|
||||
// return null;
|
||||
// }
|
||||
|
||||
if (parentId.length() == 10 ) {
|
||||
if (onlyCatalog) {
|
||||
return null;
|
||||
}
|
||||
// parentId为行业编码, 其下不会再有行政区划
|
||||
List<DeviceChannel> channels = deviceChannelMapper.getChannelsByCivilCode(deviceId, parentId);
|
||||
List<BaseTree<DeviceChannel>> trees = transportChannelsToTree(channels, parentId);
|
||||
return trees;
|
||||
}
|
||||
// 查询其下的行政区划和摄像机
|
||||
List<DeviceChannel> channelsForCivilCode = deviceChannelMapper.getChannelsWithCivilCodeAndLength(deviceId, parentId, parentId.length() + 2);
|
||||
if (!onlyCatalog) {
|
||||
List<DeviceChannel> channels = deviceChannelMapper.getChannelsByCivilCode(deviceId, parentId);
|
||||
|
||||
for(DeviceChannel channel : channels) {
|
||||
boolean flag = false;
|
||||
for(DeviceChannel deviceChannel : channelsForCivilCode) {
|
||||
if(channel.getChannelId().equals(deviceChannel.getChannelId())) {
|
||||
flag = true;
|
||||
}
|
||||
}
|
||||
if(!flag) {
|
||||
channelsForCivilCode.add(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
List<BaseTree<DeviceChannel>> trees = transportChannelsToTree(channelsForCivilCode, parentId);
|
||||
return trees;
|
||||
|
||||
}
|
||||
// 使用业务分组展示树
|
||||
if (TreeType.BUSINESS_GROUP.equals(device.getTreeType())) {
|
||||
if (parentId.length() < 14 ) {
|
||||
return null;
|
||||
}
|
||||
List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, parentId, null, null, null,null);
|
||||
List<BaseTree<DeviceChannel>> trees = transportChannelsToTree(deviceChannels, parentId);
|
||||
return trees;
|
||||
}
|
||||
|
||||
return null;
|
||||
List<DeviceChannel> rootNodes = deviceChannelMapper.getSubChannelsByDeviceId(deviceId, parentId, onlyCatalog);
|
||||
return transportChannelsToTree(rootNodes, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -470,42 +428,11 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
if (device == null) {
|
||||
return null;
|
||||
}
|
||||
if (parentId == null || parentId.equals(deviceId)) {
|
||||
// 字根节点开始查询
|
||||
List<DeviceChannel> rootNodes = getRootNodes(deviceId, TreeType.CIVIL_CODE.equals(device.getTreeType()), false, true);
|
||||
return rootNodes;
|
||||
if (ObjectUtils.isEmpty(parentId) || parentId.equals(deviceId)) {
|
||||
return deviceChannelMapper.getSubChannelsByDeviceId(deviceId, null, false);
|
||||
}else {
|
||||
return deviceChannelMapper.getSubChannelsByDeviceId(deviceId, parentId, false);
|
||||
}
|
||||
|
||||
if (TreeType.CIVIL_CODE.equals(device.getTreeType())) {
|
||||
if (parentId.length()%2 != 0) {
|
||||
return null;
|
||||
}
|
||||
// 使用行政区划展示树
|
||||
if (parentId.length() > 10) {
|
||||
// TODO 可能是行政区划与业务分组混杂的情形
|
||||
return null;
|
||||
}
|
||||
|
||||
if (parentId.length() == 10 ) {
|
||||
// parentId为行业编码, 其下不会再有行政区划
|
||||
List<DeviceChannel> channels = deviceChannelMapper.getChannelsByCivilCode(deviceId, parentId);
|
||||
return channels;
|
||||
}
|
||||
// 查询其下的行政区划和摄像机
|
||||
List<DeviceChannel> channels = deviceChannelMapper.getChannelsByCivilCode(deviceId, parentId);
|
||||
return channels;
|
||||
|
||||
}
|
||||
// 使用业务分组展示树
|
||||
if (TreeType.BUSINESS_GROUP.equals(device.getTreeType())) {
|
||||
if (parentId.length() < 14 ) {
|
||||
return null;
|
||||
}
|
||||
List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, parentId, null, null, null,null);
|
||||
return deviceChannels;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<BaseTree<DeviceChannel>> transportChannelsToTree(List<DeviceChannel> channels, String parentId) {
|
||||
@@ -525,13 +452,21 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
node.setPid(parentId);
|
||||
node.setBasicData(channel);
|
||||
node.setParent(false);
|
||||
if (channel.getChannelId().length() > 8) {
|
||||
if (channel.getChannelId().length() > 13) {
|
||||
String gbCodeType = channel.getChannelId().substring(10, 13);
|
||||
node.setParent(gbCodeType.equals(ChannelIdType.BUSINESS_GROUP) || gbCodeType.equals(ChannelIdType.VIRTUAL_ORGANIZATION) );
|
||||
}
|
||||
}else {
|
||||
if (channel.getChannelId().length() <= 8) {
|
||||
node.setParent(true);
|
||||
}else {
|
||||
if (channel.getChannelId().length() != 20) {
|
||||
node.setParent(channel.getParental() == 1);
|
||||
}else {
|
||||
try {
|
||||
int type = Integer.parseInt(channel.getChannelId().substring(10, 13));
|
||||
if (type == 215 || type == 216 || type == 200) {
|
||||
node.setParent(true);
|
||||
}
|
||||
}catch (NumberFormatException e) {
|
||||
node.setParent(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
treeNotes.add(node);
|
||||
}
|
||||
@@ -539,53 +474,6 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
return treeNotes;
|
||||
}
|
||||
|
||||
private List<DeviceChannel> getRootNodes(String deviceId, boolean isCivilCode, boolean haveCatalog, boolean haveChannel) {
|
||||
if (!haveCatalog && !haveChannel) {
|
||||
return null;
|
||||
}
|
||||
List<DeviceChannel> result = new ArrayList<>();
|
||||
if (isCivilCode) {
|
||||
// 使用行政区划
|
||||
Integer length= deviceChannelMapper.getChannelMinLength(deviceId);
|
||||
if (length == null) {
|
||||
return null;
|
||||
}
|
||||
if (length <= 10) {
|
||||
if (haveCatalog) {
|
||||
List<DeviceChannel> provinceNode = deviceChannelMapper.getChannelsWithCivilCodeAndLength(deviceId, null, length);
|
||||
if (provinceNode != null && provinceNode.size() > 0) {
|
||||
result.addAll(provinceNode);
|
||||
}
|
||||
}
|
||||
|
||||
if (haveChannel) {
|
||||
// 查询那些civilCode不在通道中的不规范通道,放置在根目录
|
||||
List<DeviceChannel> nonstandardNode = deviceChannelMapper.getChannelWithoutCiviCode(deviceId);
|
||||
if (nonstandardNode != null && nonstandardNode.size() > 0) {
|
||||
result.addAll(nonstandardNode);
|
||||
}
|
||||
}
|
||||
}else {
|
||||
if (haveChannel) {
|
||||
List<DeviceChannel> deviceChannels = deviceChannelMapper.queryChannels(deviceId, null, null, null, null,null);
|
||||
if (deviceChannels != null && deviceChannels.size() > 0) {
|
||||
result.addAll(deviceChannels);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}else {
|
||||
// 使用业务分组+虚拟组织
|
||||
|
||||
// 只获取业务分组
|
||||
List<DeviceChannel> deviceChannels = deviceChannelMapper.getBusinessGroups(deviceId, ChannelIdType.BUSINESS_GROUP);
|
||||
if (deviceChannels != null && deviceChannels.size() > 0) {
|
||||
result.addAll(deviceChannels);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExist(String deviceId) {
|
||||
return deviceMapper.getDeviceByDeviceId(deviceId) != null;
|
||||
@@ -593,7 +481,7 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
|
||||
@Override
|
||||
public void addDevice(Device device) {
|
||||
device.setOnline(0);
|
||||
device.setOnLine(false);
|
||||
device.setCreateTime(DateUtil.getNow());
|
||||
device.setUpdateTime(DateUtil.getNow());
|
||||
deviceMapper.addCustomDevice(device);
|
||||
@@ -606,6 +494,22 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
logger.warn("更新设备时未找到设备信息");
|
||||
return;
|
||||
}
|
||||
if(deviceInStore.isSwitchPrimarySubStream() != device.isSwitchPrimarySubStream()){
|
||||
//当修改设备的主子码流开关时,需要校验是否存在流,如果存在流则直接关闭
|
||||
List<SsrcTransaction> ssrcTransactionForAll = streamSession.getSsrcTransactionForAll(device.getDeviceId(), null, null, null);
|
||||
if(ssrcTransactionForAll != null){
|
||||
for (SsrcTransaction ssrcTransaction: ssrcTransactionForAll) {
|
||||
try {
|
||||
cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), ssrcTransaction.getStream(), null, null);
|
||||
} catch (InvalidArgumentException | SsrcTransactionNotFoundException | ParseException | SipException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
deviceChannelMapper.clearPlay(device.getDeviceId());
|
||||
inviteStreamService.clearInviteInfo(device.getDeviceId());
|
||||
}
|
||||
|
||||
if (!ObjectUtils.isEmpty(device.getName())) {
|
||||
deviceInStore.setName(device.getName());
|
||||
}
|
||||
@@ -617,7 +521,6 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
}
|
||||
deviceInStore.setSdpIp(device.getSdpIp());
|
||||
deviceInStore.setCharset(device.getCharset());
|
||||
deviceInStore.setTreeType(device.getTreeType());
|
||||
|
||||
// 目录订阅相关的信息
|
||||
if (device.getSubscribeCycleForCatalog() > 0) {
|
||||
@@ -673,12 +576,17 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
}catch (Exception e) {
|
||||
dataSourceTransactionManager.rollback(transactionStatus);
|
||||
}
|
||||
if (result) {
|
||||
redisCatchStorage.removeDevice(deviceId);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceBaceInfo getOverview() {
|
||||
return deviceMapper.getOverview();
|
||||
public ResourceBaseInfo getOverview() {
|
||||
List<Device> onlineDevices = deviceMapper.getOnlineDevices();
|
||||
List<Device> all = deviceMapper.getAll();
|
||||
return new ResourceBaseInfo(all.size(), onlineDevices.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -110,23 +110,18 @@ public class GbStreamServiceImpl implements IGbStreamService {
|
||||
deviceChannel.setLatitude(gbStream.getLatitude());
|
||||
deviceChannel.setDeviceId(platform.getDeviceGBId());
|
||||
deviceChannel.setManufacture("wvp-pro");
|
||||
deviceChannel.setStatus(gbStream.isStatus()?1:0);
|
||||
deviceChannel.setStatus(gbStream.isStatus());
|
||||
|
||||
deviceChannel.setRegisterWay(1);
|
||||
deviceChannel.setCivilCode(platform.getAdministrativeDivision());
|
||||
|
||||
if (platform.getTreeType().equals(TreeType.CIVIL_CODE)){
|
||||
deviceChannel.setCivilCode(catalogId);
|
||||
}else if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)){
|
||||
PlatformCatalog catalog = catalogMapper.select(catalogId);
|
||||
if (catalog == null) {
|
||||
deviceChannel.setParentId(platform.getDeviceGBId());
|
||||
deviceChannel.setBusinessGroupId(null);
|
||||
}else {
|
||||
deviceChannel.setParentId(catalog.getId());
|
||||
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
|
||||
}
|
||||
|
||||
PlatformCatalog catalog = catalogMapper.select(catalogId);
|
||||
if (catalog != null) {
|
||||
deviceChannel.setCivilCode(catalog.getCivilCode());
|
||||
deviceChannel.setParentId(catalog.getParentId());
|
||||
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
|
||||
}else {
|
||||
deviceChannel.setCivilCode(platform.getAdministrativeDivision());
|
||||
deviceChannel.setParentId(platform.getDeviceGBId());
|
||||
}
|
||||
|
||||
deviceChannel.setModel("live");
|
||||
@@ -218,23 +213,17 @@ public class GbStreamServiceImpl implements IGbStreamService {
|
||||
}else {
|
||||
status = gbStreamMapper.selectStatusForPush(gbStream.getApp(), gbStream.getStream());
|
||||
}
|
||||
deviceChannel.setStatus((status != null && status )?1:0);
|
||||
deviceChannel.setStatus(status != null && status);
|
||||
|
||||
deviceChannel.setRegisterWay(1);
|
||||
deviceChannel.setCivilCode(platform.getAdministrativeDivision());
|
||||
|
||||
if (platform.getTreeType().equals(TreeType.CIVIL_CODE)){
|
||||
deviceChannel.setCivilCode(catalogId);
|
||||
}else if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)){
|
||||
PlatformCatalog catalog = catalogMapper.select(catalogId);
|
||||
if (catalog == null) {
|
||||
deviceChannel.setParentId(platform.getDeviceGBId());
|
||||
deviceChannel.setBusinessGroupId(null);
|
||||
}else {
|
||||
deviceChannel.setParentId(catalog.getId());
|
||||
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
|
||||
}
|
||||
|
||||
PlatformCatalog catalog = catalogMapper.select(catalogId);
|
||||
if (catalog != null) {
|
||||
deviceChannel.setCivilCode(catalog.getCivilCode());
|
||||
deviceChannel.setParentId(catalog.getParentId());
|
||||
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
|
||||
}else {
|
||||
deviceChannel.setCivilCode(platform.getAdministrativeDivision());
|
||||
deviceChannel.setParentId(platform.getDeviceGBId());
|
||||
}
|
||||
|
||||
deviceChannel.setModel("live");
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.common.InviteSessionStatus;
|
||||
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.service.IInviteStreamService;
|
||||
import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -24,7 +24,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
|
||||
|
||||
private final Map<String, List<InviteErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
|
||||
private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<Object, Object> redisTemplate;
|
||||
@@ -84,6 +84,24 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
|
||||
redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
|
||||
|
||||
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
|
||||
if (inviteInfoInDb == null) {
|
||||
return null;
|
||||
}
|
||||
removeInviteInfo(inviteInfoInDb);
|
||||
String key = VideoManagerConstants.INVITE_PREFIX +
|
||||
"_" + inviteInfo.getType() +
|
||||
"_" + inviteInfo.getDeviceId() +
|
||||
"_" + inviteInfo.getChannelId() +
|
||||
"_" + stream;
|
||||
inviteInfoInDb.setStream(stream);
|
||||
redisTemplate.opsForValue().set(key, inviteInfoInDb);
|
||||
return inviteInfoInDb;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
|
||||
String key = VideoManagerConstants.INVITE_PREFIX +
|
||||
@@ -141,9 +159,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void once(InviteSessionType type, String deviceId, String channelId, String stream, InviteErrorCallback<Object> callback) {
|
||||
public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) {
|
||||
String key = buildKey(type, deviceId, channelId, stream);
|
||||
List<InviteErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
|
||||
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
|
||||
if (callbacks == null) {
|
||||
callbacks = new CopyOnWriteArrayList<>();
|
||||
inviteErrorCallbackMap.put(key, callbacks);
|
||||
@@ -152,19 +170,6 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
|
||||
String key = buildKey(type, deviceId, channelId, stream);
|
||||
List<InviteErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
|
||||
if (callbacks == null) {
|
||||
return;
|
||||
}
|
||||
for (InviteErrorCallback<Object> callback : callbacks) {
|
||||
callback.run(code, msg, data);
|
||||
}
|
||||
inviteErrorCallbackMap.remove(key);
|
||||
}
|
||||
|
||||
private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
|
||||
String key = type + "_" + deviceId + "_" + channelId;
|
||||
// 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
|
||||
@@ -179,4 +184,46 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
|
||||
public void clearInviteInfo(String deviceId) {
|
||||
removeInviteInfo(null, deviceId, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStreamInfoCount(String mediaServerId) {
|
||||
int count = 0;
|
||||
String key = VideoManagerConstants.INVITE_PREFIX + "_*_*_*_*";
|
||||
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
|
||||
if (scanResult.size() == 0) {
|
||||
return 0;
|
||||
}else {
|
||||
for (Object keyObj : scanResult) {
|
||||
String keyStr = (String) keyObj;
|
||||
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
|
||||
if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
|
||||
String key = buildSubStreamKey(type, deviceId, channelId, stream);
|
||||
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
|
||||
if (callbacks == null) {
|
||||
return;
|
||||
}
|
||||
for (ErrorCallback<Object> callback : callbacks) {
|
||||
callback.run(code, msg, data);
|
||||
}
|
||||
inviteErrorCallbackMap.remove(key);
|
||||
}
|
||||
|
||||
|
||||
private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
|
||||
String key = type + "_" + "_" + deviceId + "_" + channelId;
|
||||
// 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
|
||||
if (stream != null) {
|
||||
key += ("_" + stream);
|
||||
}
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
|
||||
import com.genersoft.iot.vmp.service.IInviteStreamService;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
|
||||
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
||||
@@ -97,6 +98,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@Autowired
|
||||
private IInviteStreamService inviteStreamService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<Object, Object> redisTemplate;
|
||||
|
||||
@@ -423,7 +427,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
}
|
||||
final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId();
|
||||
dynamicTask.stop(zlmKeepaliveKey);
|
||||
dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (Math.getExponent(serverItem.getHookAliveInterval()) + 5) * 1000);
|
||||
dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval().intValue() + 5) * 1000);
|
||||
publisher.zlmOnlineEventPublish(serverItem.getId());
|
||||
|
||||
logger.info("[ZLM] 连接成功 {} - {}:{} ",
|
||||
@@ -694,7 +698,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
// 缓存不存在,从数据库查询,如果数据库不存在则是错误的
|
||||
mediaServerItem = getOneFromDatabase(mediaServerId);
|
||||
if (mediaServerItem == null) {
|
||||
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
|
||||
logger.warn("[更新ZLM 保活信息] 流媒体{}尚未加入使用,请检查节点中是否含有此流媒体 ", mediaServerId);
|
||||
return;
|
||||
}
|
||||
// zlm连接重试
|
||||
@@ -744,7 +748,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
result.setId(mediaServerItem.getId());
|
||||
result.setPush(redisCatchStorage.getPushStreamCount(mediaServerItem.getId()));
|
||||
result.setProxy(redisCatchStorage.getProxyStreamCount(mediaServerItem.getId()));
|
||||
result.setGbReceive(redisCatchStorage.getGbReceiveCount(mediaServerItem.getId()));
|
||||
|
||||
result.setGbReceive(inviteStreamService.getStreamInfoCount(mediaServerItem.getId()));
|
||||
result.setGbSend(redisCatchStorage.getGbSendCount(mediaServerItem.getId()));
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -126,22 +126,17 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
|
||||
List<DeviceChannel> deviceChannelList = new ArrayList<>();
|
||||
if (channelReduces.size() > 0){
|
||||
PlatformCatalog catalog = catalogManager.select(catalogId);
|
||||
if (catalog == null && !catalogId.equals(platform.getDeviceGBId())) {
|
||||
if (catalog == null || !catalogId.equals(platform.getDeviceGBId())) {
|
||||
logger.warn("未查询到目录{}的信息", catalogId);
|
||||
return null;
|
||||
}
|
||||
for (ChannelReduce channelReduce : channelReduces) {
|
||||
DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
|
||||
deviceChannel.setParental(0);
|
||||
deviceChannel.setCivilCode(catalog.getCivilCode());
|
||||
deviceChannel.setParentId(catalog.getParentId());
|
||||
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
|
||||
deviceChannelList.add(deviceChannel);
|
||||
if (platform.getTreeType().equals(TreeType.CIVIL_CODE)){
|
||||
deviceChannel.setCivilCode(catalogId);
|
||||
}else if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)){
|
||||
deviceChannel.setParentId(catalogId);
|
||||
if (catalog != null) {
|
||||
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return deviceChannelList;
|
||||
|
||||
@@ -49,6 +49,8 @@ import java.util.UUID;
|
||||
public class PlatformServiceImpl implements IPlatformService {
|
||||
|
||||
private final static String REGISTER_KEY_PREFIX = "platform_register_";
|
||||
|
||||
private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
|
||||
private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class);
|
||||
@@ -158,14 +160,6 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
|
||||
ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
|
||||
parentPlatform.setUpdateTime(DateUtil.getNow());
|
||||
if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) {
|
||||
// 目录结构发生变化,清空之前的关联关系
|
||||
logger.info("保存平台{}时发现目录结构变化,清空关联关系", parentPlatform.getDeviceGBId());
|
||||
catalogMapper.delByPlatformId(parentPlatformOld.getServerGBId());
|
||||
platformChannelMapper.delByPlatformId(parentPlatformOld.getServerGBId());
|
||||
platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId());
|
||||
}
|
||||
|
||||
|
||||
// 停止心跳定时
|
||||
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
|
||||
@@ -176,12 +170,11 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
// 注销旧的
|
||||
try {
|
||||
if (parentPlatformOld.isStatus()) {
|
||||
logger.info("保存平台{}时发现救平台在线,发送注销命令", parentPlatformOld.getServerGBId());
|
||||
logger.info("保存平台{}时发现旧平台在线,发送注销命令", parentPlatformOld.getServerGBId());
|
||||
commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
|
||||
logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
|
||||
});
|
||||
}
|
||||
|
||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
||||
logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
|
||||
}
|
||||
@@ -214,9 +207,6 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
// 重新开启定时注册, 使用续订消息
|
||||
// 重新开始心跳保活
|
||||
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -225,6 +215,9 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
@Override
|
||||
public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
|
||||
logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId());
|
||||
final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
|
||||
dynamicTask.stop(registerFailAgainTaskKey);
|
||||
|
||||
platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
|
||||
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
|
||||
if (parentPlatformCatch == null) {
|
||||
@@ -265,15 +258,9 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
// 此时是第三次心跳超时, 平台离线
|
||||
if (platformCatch.getKeepAliveReply() == 2) {
|
||||
// 设置平台离线,并重新注册
|
||||
logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId());
|
||||
try {
|
||||
commanderForPlatform.register(parentPlatform, eventResult1 -> {
|
||||
logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId());
|
||||
offline(parentPlatform, false);
|
||||
}, null);
|
||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
||||
logger.error("[命令发送失败] 国标级联 注册: {}", e.getMessage());
|
||||
}
|
||||
logger.info("[国标级联] 三次心跳超时, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
|
||||
offline(parentPlatform, false);
|
||||
|
||||
}
|
||||
|
||||
}else {
|
||||
@@ -299,21 +286,22 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
|
||||
private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
|
||||
try {
|
||||
// 设置超时重发, 后续从底层支持消息重发
|
||||
String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";
|
||||
if (dynamicTask.isAlive(key)) {
|
||||
return;
|
||||
// 不在同一个会话中续订则每次全新注册
|
||||
if (!userSetting.isRegisterKeepIntDialog()) {
|
||||
sipTransactionInfo = null;
|
||||
}
|
||||
dynamicTask.startDelay(key, ()->{
|
||||
registerTask(parentPlatform, sipTransactionInfo);
|
||||
}, 1000);
|
||||
logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
|
||||
|
||||
if (sipTransactionInfo == null) {
|
||||
logger.info("[国标级联] 平台:{}注册即将到期,开始重新注册", parentPlatform.getServerGBId());
|
||||
}else {
|
||||
logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
|
||||
}
|
||||
|
||||
commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> {
|
||||
dynamicTask.stop(key);
|
||||
logger.info("[国标级联] 平台:{}注册失败,{}:{}", parentPlatform.getServerGBId(),
|
||||
eventResult.statusCode, eventResult.msg);
|
||||
offline(parentPlatform, false);
|
||||
},eventResult -> {
|
||||
dynamicTask.stop(key);
|
||||
});
|
||||
}, null);
|
||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
||||
logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
|
||||
}
|
||||
@@ -334,24 +322,35 @@ public class PlatformServiceImpl implements IPlatformService {
|
||||
// 停止所有推流
|
||||
logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
|
||||
stopAllPush(parentPlatform.getServerGBId());
|
||||
if (stopRegister) {
|
||||
// 清除注册定时
|
||||
logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
|
||||
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
|
||||
if (dynamicTask.contains(registerTaskKey)) {
|
||||
dynamicTask.stop(registerTaskKey);
|
||||
}
|
||||
|
||||
// 清除注册定时
|
||||
logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
|
||||
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
|
||||
if (dynamicTask.contains(registerTaskKey)) {
|
||||
dynamicTask.stop(registerTaskKey);
|
||||
}
|
||||
// 清除心跳定时
|
||||
logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());
|
||||
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
|
||||
if (dynamicTask.contains(keepaliveTaskKey)) {
|
||||
// 添加心跳任务
|
||||
// 清除心跳任务
|
||||
dynamicTask.stop(keepaliveTaskKey);
|
||||
}
|
||||
// 停止目录订阅回复
|
||||
logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId());
|
||||
subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
|
||||
// 发起定时自动重新注册
|
||||
if (!stopRegister) {
|
||||
// 设置为60秒自动尝试重新注册
|
||||
final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
|
||||
ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId());
|
||||
if (platform.isEnable()) {
|
||||
dynamicTask.startCron(registerFailAgainTaskKey,
|
||||
()-> registerTask(platform, null),
|
||||
userSetting.getRegisterAgainAfterTime() * 1000);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void stopAllPush(String platformId) {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.common.InviteInfo;
|
||||
import com.genersoft.iot.vmp.common.InviteSessionStatus;
|
||||
@@ -29,8 +28,10 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||
import com.genersoft.iot.vmp.service.*;
|
||||
import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
||||
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
|
||||
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
||||
@@ -57,6 +58,7 @@ import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.ResponseEvent;
|
||||
import javax.sip.SipException;
|
||||
import javax.sip.header.CallIdHeader;
|
||||
import java.io.File;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.text.ParseException;
|
||||
@@ -144,14 +146,13 @@ public class PlayServiceImpl implements IPlayService {
|
||||
|
||||
|
||||
@Override
|
||||
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, InviteErrorCallback<Object> callback) {
|
||||
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback) {
|
||||
if (mediaServerItem == null) {
|
||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
|
||||
}
|
||||
|
||||
Device device = redisCatchStorage.getDevice(deviceId);
|
||||
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
|
||||
|
||||
if (inviteInfo != null ) {
|
||||
if (inviteInfo.getStreamInfo() == null) {
|
||||
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
|
||||
@@ -353,7 +354,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
|
||||
@Override
|
||||
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
|
||||
InviteErrorCallback<Object> callback) {
|
||||
ErrorCallback<Object> callback) {
|
||||
|
||||
if (mediaServerItem == null || ssrcInfo == null) {
|
||||
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
|
||||
@@ -361,8 +362,9 @@ public class PlayServiceImpl implements IPlayService {
|
||||
null);
|
||||
return;
|
||||
}
|
||||
logger.info("[点播开始] deviceId: {}, channelId: {},收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
|
||||
|
||||
logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
|
||||
device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(),
|
||||
device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
|
||||
//端口获取失败的ssrcInfo 没有必要发送点播指令
|
||||
if (ssrcInfo.getPort() <= 0) {
|
||||
logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
|
||||
@@ -377,9 +379,10 @@ public class PlayServiceImpl implements IPlayService {
|
||||
}
|
||||
|
||||
// 初始化redis中的invite消息状态
|
||||
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
|
||||
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
|
||||
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
|
||||
InviteSessionStatus.ready);
|
||||
inviteInfo.setSubStream(device.isSwitchPrimarySubStream());
|
||||
inviteStreamService.updateInviteInfo(inviteInfo);
|
||||
// 超时处理
|
||||
String timeOutTaskKey = UUID.randomUUID().toString();
|
||||
@@ -387,7 +390,10 @@ public class PlayServiceImpl implements IPlayService {
|
||||
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
|
||||
InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
|
||||
if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
|
||||
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
|
||||
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
|
||||
device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
|
||||
ssrcInfo.getPort(), ssrcInfo.getSsrc());
|
||||
|
||||
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
|
||||
// InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
|
||||
// if (inviteInfoForTimeout == null) {
|
||||
@@ -401,8 +407,8 @@ public class PlayServiceImpl implements IPlayService {
|
||||
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
|
||||
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
|
||||
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
|
||||
|
||||
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
|
||||
|
||||
try {
|
||||
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
|
||||
} catch (InvalidArgumentException | ParseException | SipException |
|
||||
@@ -421,11 +427,11 @@ public class PlayServiceImpl implements IPlayService {
|
||||
}, userSetting.getPlayTimeout());
|
||||
|
||||
try {
|
||||
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
|
||||
logger.info("收到订阅消息: " + response.toJSONString());
|
||||
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> {
|
||||
logger.info("收到订阅消息: " + hookParam);
|
||||
dynamicTask.stop(timeOutTaskKey);
|
||||
// hook响应
|
||||
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
|
||||
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId);
|
||||
if (streamInfo == null){
|
||||
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
|
||||
@@ -439,7 +445,8 @@ public class PlayServiceImpl implements IPlayService {
|
||||
InviteErrorCode.SUCCESS.getCode(),
|
||||
InviteErrorCode.SUCCESS.getMsg(),
|
||||
streamInfo);
|
||||
logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
|
||||
logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(),
|
||||
device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
|
||||
String streamUrl;
|
||||
if (mediaServerItemInuse.getRtspPort() != 0) {
|
||||
streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream());
|
||||
@@ -505,21 +512,8 @@ public class PlayServiceImpl implements IPlayService {
|
||||
logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
|
||||
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
|
||||
logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
|
||||
if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
|
||||
// ssrc 不可用
|
||||
logger.info("[点播消息] SSRC修正时发现ssrc不可使用 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
|
||||
// 释放ssrc
|
||||
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
||||
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
||||
|
||||
callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
|
||||
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
|
||||
InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
|
||||
|
||||
return;
|
||||
}
|
||||
// 释放ssrc
|
||||
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
||||
// 单端口模式streamId也有变化,重新设置监听即可
|
||||
if (!mediaServerItem.isRtpEnable()) {
|
||||
// 添加订阅
|
||||
@@ -527,12 +521,12 @@ public class PlayServiceImpl implements IPlayService {
|
||||
subscribe.removeSubscribe(hookSubscribe);
|
||||
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
|
||||
hookSubscribe.getContent().put("stream", stream);
|
||||
inviteInfo.setStream(stream);
|
||||
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
|
||||
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
|
||||
inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
|
||||
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
|
||||
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
|
||||
dynamicTask.stop(timeOutTaskKey);
|
||||
// hook响应
|
||||
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
|
||||
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
|
||||
if (streamInfo == null){
|
||||
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
|
||||
@@ -542,7 +536,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
return;
|
||||
}
|
||||
callback.run(InviteErrorCode.SUCCESS.getCode(),
|
||||
InviteErrorCode.SUCCESS.getMsg(), null);
|
||||
InviteErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
|
||||
InviteErrorCode.SUCCESS.getCode(),
|
||||
InviteErrorCode.SUCCESS.getMsg(),
|
||||
@@ -574,6 +568,9 @@ public class PlayServiceImpl implements IPlayService {
|
||||
"下级自定义了ssrc,重新设置收流信息失败", null);
|
||||
|
||||
}else {
|
||||
if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
|
||||
inviteStreamService.removeInviteInfo(inviteInfo);
|
||||
}
|
||||
ssrcInfo.setSsrc(ssrcInResponse);
|
||||
inviteInfo.setSsrcInfo(ssrcInfo);
|
||||
inviteInfo.setStream(ssrcInfo.getStream());
|
||||
@@ -622,6 +619,8 @@ public class PlayServiceImpl implements IPlayService {
|
||||
@Override
|
||||
public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
|
||||
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
|
||||
Device device = redisCatchStorage.getDevice(deviceId);
|
||||
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
|
||||
if (streamInfo != null) {
|
||||
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
|
||||
if (deviceChannel != null) {
|
||||
@@ -639,9 +638,9 @@ public class PlayServiceImpl implements IPlayService {
|
||||
|
||||
}
|
||||
|
||||
private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
|
||||
|
||||
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
|
||||
private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
|
||||
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
|
||||
StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
|
||||
if (streamInfo != null) {
|
||||
streamInfo.setStartTime(startTime);
|
||||
streamInfo.setEndTime(endTime);
|
||||
@@ -698,7 +697,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
|
||||
@Override
|
||||
public void playBack(String deviceId, String channelId, String startTime,
|
||||
String endTime, InviteErrorCallback<Object> callback) {
|
||||
String endTime, ErrorCallback<Object> callback) {
|
||||
Device device = storager.queryVideoDevice(deviceId);
|
||||
if (device == null) {
|
||||
return;
|
||||
@@ -711,7 +710,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
@Override
|
||||
public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
|
||||
String deviceId, String channelId, String startTime,
|
||||
String endTime, InviteErrorCallback<Object> callback) {
|
||||
String endTime, ErrorCallback<Object> callback) {
|
||||
if (mediaServerItem == null || ssrcInfo == null) {
|
||||
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
|
||||
@@ -727,7 +726,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
|
||||
ssrcInfo.getSsrc(), device.isSsrcCheck());
|
||||
// 初始化redis中的invite消息状态
|
||||
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
|
||||
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
|
||||
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
|
||||
InviteSessionStatus.ready);
|
||||
inviteStreamService.updateInviteInfo(inviteInfo);
|
||||
@@ -760,10 +759,10 @@ public class PlayServiceImpl implements IPlayService {
|
||||
inviteStreamService.removeInviteInfo(inviteInfo);
|
||||
};
|
||||
|
||||
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
|
||||
logger.info("收到回放订阅消息: " + jsonObject);
|
||||
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
|
||||
logger.info("收到回放订阅消息: " + hookParam);
|
||||
dynamicTask.stop(playBackTimeOutTaskKey);
|
||||
StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
|
||||
StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
|
||||
if (streamInfo == null) {
|
||||
logger.warn("设备回放API调用失败!");
|
||||
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
|
||||
@@ -829,17 +828,8 @@ public class PlayServiceImpl implements IPlayService {
|
||||
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
|
||||
logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
|
||||
|
||||
if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
|
||||
// ssrc 不可用
|
||||
logger.info("[录像回放] SSRC修正时发现ssrc不可使用 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
|
||||
// 释放ssrc
|
||||
dynamicTask.stop(playBackTimeOutTaskKey);
|
||||
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
||||
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
||||
callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
|
||||
return;
|
||||
}
|
||||
// 释放ssrc
|
||||
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
||||
|
||||
// 单端口模式streamId也有变化,需要重新设置监听
|
||||
if (!mediaServerItem.isRtpEnable()) {
|
||||
@@ -848,12 +838,12 @@ public class PlayServiceImpl implements IPlayService {
|
||||
subscribe.removeSubscribe(hookSubscribe);
|
||||
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
|
||||
hookSubscribe.getContent().put("stream", stream);
|
||||
inviteInfo.setStream(stream);
|
||||
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
|
||||
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
|
||||
inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
|
||||
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
|
||||
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
|
||||
dynamicTask.stop(playBackTimeOutTaskKey);
|
||||
// hook响应
|
||||
hookEvent.response(mediaServerItemInUse, response);
|
||||
hookEvent.response(mediaServerItemInUse, hookParam);
|
||||
});
|
||||
}
|
||||
// 更新ssrc
|
||||
@@ -877,6 +867,10 @@ public class PlayServiceImpl implements IPlayService {
|
||||
"下级自定义了ssrc,重新设置收流信息失败", null);
|
||||
|
||||
}else {
|
||||
if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
|
||||
inviteStreamService.removeInviteInfo(inviteInfo);
|
||||
}
|
||||
|
||||
ssrcInfo.setSsrc(ssrcInResponse);
|
||||
inviteInfo.setSsrcInfo(ssrcInfo);
|
||||
inviteInfo.setStream(ssrcInfo.getStream());
|
||||
@@ -900,7 +894,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
|
||||
|
||||
@Override
|
||||
public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) {
|
||||
public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
|
||||
Device device = storager.queryVideoDevice(deviceId);
|
||||
if (device == null) {
|
||||
return;
|
||||
@@ -918,7 +912,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
|
||||
|
||||
@Override
|
||||
public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteErrorCallback<Object> callback) {
|
||||
public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
|
||||
if (mediaServerItem == null || ssrcInfo == null) {
|
||||
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
|
||||
@@ -934,7 +928,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
}
|
||||
logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
|
||||
// 初始化redis中的invite消息状态
|
||||
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
|
||||
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
|
||||
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
|
||||
InviteSessionStatus.ready);
|
||||
inviteStreamService.updateInviteInfo(inviteInfo);
|
||||
@@ -964,10 +958,10 @@ public class PlayServiceImpl implements IPlayService {
|
||||
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
||||
inviteStreamService.removeInviteInfo(inviteInfo);
|
||||
};
|
||||
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
|
||||
logger.info("[录像下载]收到订阅消息: " + jsonObject);
|
||||
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
|
||||
logger.info("[录像下载]收到订阅消息: " + hookParam);
|
||||
dynamicTask.stop(downLoadTimeOutTaskKey);
|
||||
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
|
||||
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
|
||||
if (streamInfo == null) {
|
||||
logger.warn("[录像下载] 获取流地址信息失败");
|
||||
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
|
||||
@@ -1032,26 +1026,21 @@ public class PlayServiceImpl implements IPlayService {
|
||||
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
|
||||
logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
|
||||
|
||||
if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) {
|
||||
// ssrc 不可用
|
||||
// 释放ssrc
|
||||
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
||||
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
||||
callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
|
||||
return;
|
||||
}
|
||||
// 释放ssrc
|
||||
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
||||
|
||||
// 单端口模式streamId也有变化,需要重新设置监听
|
||||
if (!mediaServerItem.isRtpEnable()) {
|
||||
// 添加订阅
|
||||
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
|
||||
subscribe.removeSubscribe(hookSubscribe);
|
||||
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
|
||||
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
|
||||
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
|
||||
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
|
||||
hookSubscribe.getContent().put("stream", stream);
|
||||
inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
|
||||
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
|
||||
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam);
|
||||
dynamicTask.stop(downLoadTimeOutTaskKey);
|
||||
hookEvent.response(mediaServerItemInUse, response);
|
||||
hookEvent.response(mediaServerItemInUse, hookParam);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1075,6 +1064,9 @@ public class PlayServiceImpl implements IPlayService {
|
||||
"下级自定义了ssrc,重新设置收流信息失败", null);
|
||||
|
||||
}else {
|
||||
if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
|
||||
inviteStreamService.removeInviteInfo(inviteInfo);
|
||||
}
|
||||
ssrcInfo.setSsrc(ssrcInResponse);
|
||||
inviteInfo.setSsrcInfo(ssrcInfo);
|
||||
inviteInfo.setStream(ssrcInfo.getStream());
|
||||
@@ -1083,6 +1075,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
|
||||
}
|
||||
}
|
||||
inviteStreamService.updateInviteInfo(inviteInfo);
|
||||
});
|
||||
} catch (InvalidArgumentException | SipException | ParseException e) {
|
||||
logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
|
||||
@@ -1141,8 +1134,9 @@ public class PlayServiceImpl implements IPlayService {
|
||||
return null;
|
||||
}
|
||||
|
||||
private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
|
||||
StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId);
|
||||
private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
|
||||
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
|
||||
StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
|
||||
if (streamInfo != null) {
|
||||
streamInfo.setProgress(0);
|
||||
streamInfo.setStartTime(startTime);
|
||||
@@ -1159,15 +1153,14 @@ public class PlayServiceImpl implements IPlayService {
|
||||
}
|
||||
|
||||
|
||||
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
|
||||
String streamId = resonse.getString("stream");
|
||||
JSONArray tracks = resonse.getJSONArray("tracks");
|
||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
|
||||
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
|
||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
|
||||
streamInfo.setDeviceID(deviceId);
|
||||
streamInfo.setChannelId(channelId);
|
||||
return streamInfo;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void zlmServerOffline(String mediaServerId) {
|
||||
// 处理正在向上推流的上级平台
|
||||
@@ -1635,4 +1628,52 @@ public class PlayServiceImpl implements IPlayService {
|
||||
}
|
||||
redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
|
||||
Device device = deviceService.getDevice(deviceId);
|
||||
if (device == null) {
|
||||
errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
|
||||
return;
|
||||
}
|
||||
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
|
||||
if (inviteInfo != null) {
|
||||
if (inviteInfo.getStreamInfo() != null) {
|
||||
// 已存在线直接截图
|
||||
MediaServerItem mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
|
||||
String streamUrl;
|
||||
if (mediaServerItemInuse.getRtspPort() != 0) {
|
||||
streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream());
|
||||
}else {
|
||||
streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", inviteInfo.getStreamInfo().getStream());
|
||||
}
|
||||
String path = "snap";
|
||||
// 请求截图
|
||||
logger.info("[请求截图]: " + fileName);
|
||||
zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
|
||||
File snapFile = new File(path + File.separator + fileName);
|
||||
if (snapFile.exists()) {
|
||||
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
|
||||
}else {
|
||||
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
|
||||
play(newMediaServerItem, deviceId, channelId, (code, msg, data)->{
|
||||
if (code == InviteErrorCode.SUCCESS.getCode()) {
|
||||
InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
|
||||
if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
|
||||
getSnap(deviceId, channelId, fileName, errorCallback);
|
||||
}else {
|
||||
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
|
||||
}
|
||||
}else {
|
||||
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.service.IRecordInfoServer;
|
||||
import com.genersoft.iot.vmp.storager.dao.RecordInfoDao;
|
||||
import com.genersoft.iot.vmp.storager.dao.dto.RecordInfo;
|
||||
import com.github.pagehelper.PageHelper;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
public class RecordInfoServerImpl implements IRecordInfoServer {
|
||||
|
||||
@Autowired
|
||||
private RecordInfoDao recordInfoDao;
|
||||
|
||||
@Override
|
||||
public PageInfo<RecordInfo> getRecordList(int page, int count) {
|
||||
PageHelper.startPage(page, count);
|
||||
List<RecordInfo> all = recordInfoDao.selectAll();
|
||||
return new PageInfo<>(all);
|
||||
}
|
||||
}
|
||||
@@ -2,12 +2,16 @@ package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||
@@ -23,7 +27,7 @@ import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
|
||||
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -79,6 +83,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
@Autowired
|
||||
private ZlmHttpHookSubscribe hookSubscribe;
|
||||
|
||||
@Autowired
|
||||
DataSourceTransactionManager dataSourceTransactionManager;
|
||||
|
||||
@@ -87,7 +94,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
|
||||
|
||||
@Override
|
||||
public StreamInfo save(StreamProxyItem param) {
|
||||
public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {
|
||||
MediaServerItem mediaInfo;
|
||||
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
|
||||
mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
|
||||
@@ -98,10 +105,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
logger.warn("保存代理未找到在线的ZLM...");
|
||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
|
||||
}
|
||||
String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
|
||||
param.getStream() );
|
||||
param.setDst_url(dstUrl);
|
||||
StringBuffer resultMsg = new StringBuffer();
|
||||
String dstUrl;
|
||||
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
|
||||
JSONObject jsonObject = zlmresTfulUtils.getMediaServerConfig(mediaInfo);
|
||||
if (jsonObject.getInteger("code") != 0) {
|
||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取流媒体配置失败");
|
||||
}
|
||||
JSONArray dataArray = jsonObject.getJSONArray("data");
|
||||
JSONObject mediaServerConfig = dataArray.getJSONObject(0);
|
||||
String ffmpegCmd = mediaServerConfig.getString(param.getFfmpegCmdKey());
|
||||
String schema = getSchemaFromFFmpegCmd(ffmpegCmd);
|
||||
if (schema == null) {
|
||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式");
|
||||
}
|
||||
int port;
|
||||
String schemaForUri;
|
||||
if (schema.equalsIgnoreCase("rtsp")) {
|
||||
port = mediaInfo.getRtspPort();
|
||||
schemaForUri = schema;
|
||||
}else if (schema.equalsIgnoreCase("flv")) {
|
||||
port = mediaInfo.getHttpPort();
|
||||
schemaForUri = "http";
|
||||
}else if (schema.equalsIgnoreCase("rtmp")) {
|
||||
port = mediaInfo.getRtmpPort();
|
||||
schemaForUri = schema;
|
||||
}else {
|
||||
port = mediaInfo.getRtmpPort();
|
||||
schemaForUri = schema;
|
||||
}
|
||||
|
||||
dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
|
||||
param.getStream());
|
||||
}else {
|
||||
dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
|
||||
param.getStream());
|
||||
}
|
||||
param.setDstUrl(dstUrl);
|
||||
logger.info("[拉流代理] 输出地址为:{}", dstUrl);
|
||||
param.setMediaServerId(mediaInfo.getId());
|
||||
boolean saveResult;
|
||||
// 更新
|
||||
@@ -111,29 +151,60 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
saveResult = addStreamProxy(param);
|
||||
}
|
||||
if (!saveResult) {
|
||||
throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败");
|
||||
callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
|
||||
return;
|
||||
}
|
||||
StreamInfo resultForStreamInfo = null;
|
||||
resultMsg.append("保存成功");
|
||||
|
||||
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
|
||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||
});
|
||||
|
||||
if (param.isEnable()) {
|
||||
JSONObject jsonObject = addStreamProxyToZlm(param);
|
||||
if (jsonObject == null || jsonObject.getInteger("code") != 0) {
|
||||
resultMsg.append(", 但是启用失败,请检查流地址是否可用");
|
||||
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
|
||||
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||
}else {
|
||||
param.setEnable(false);
|
||||
// 直接移除
|
||||
if (param.isEnable_remove_none_reader()) {
|
||||
if (param.isEnableRemoveNoneReader()) {
|
||||
del(param.getApp(), param.getStream());
|
||||
}else {
|
||||
updateStreamProxy(param);
|
||||
}
|
||||
if (jsonObject == null){
|
||||
callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
|
||||
return;
|
||||
}else {
|
||||
callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}else {
|
||||
resultForStreamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
|
||||
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
|
||||
String[] paramArray = ffmpegCmd.split(" ");
|
||||
if (paramArray.length == 0) {
|
||||
return null;
|
||||
}
|
||||
for (int i = 0; i < paramArray.length; i++) {
|
||||
if (paramArray[i].equalsIgnoreCase("-f")) {
|
||||
if (i + 1 < paramArray.length - 1) {
|
||||
return paramArray[i+1];
|
||||
}else {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return resultForStreamInfo;
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -222,11 +293,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
}
|
||||
if ("default".equals(param.getType())){
|
||||
result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
|
||||
param.isEnable_audio(), param.isEnable_mp4(), param.getRtp_type());
|
||||
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
|
||||
}else if ("ffmpeg".equals(param.getType())) {
|
||||
result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(),
|
||||
param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(),
|
||||
param.getFfmpeg_cmd_key());
|
||||
result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl(), param.getDstUrl(),
|
||||
param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
|
||||
param.getFfmpegCmdKey());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@@ -280,7 +351,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
updateStreamProxy(streamProxy);
|
||||
}else {
|
||||
logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"),
|
||||
streamProxy.getSrc_url() == null? streamProxy.getUrl():streamProxy.getSrc_url());
|
||||
streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@@ -326,7 +397,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Override
|
||||
public void zlmServerOnline(String mediaServerId) {
|
||||
// 移除开启了无人观看自动移除的流
|
||||
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
if (streamProxyItemList.size() > 0) {
|
||||
gbStreamMapper.batchDel(streamProxyItemList);
|
||||
}
|
||||
@@ -354,7 +425,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Override
|
||||
public void zlmServerOffline(String mediaServerId) {
|
||||
// 移除开启了无人观看自动移除的流
|
||||
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
if (streamProxyItemList.size() > 0) {
|
||||
gbStreamMapper.batchDel(streamProxyItemList);
|
||||
}
|
||||
@@ -432,7 +503,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceBaceInfo getOverview() {
|
||||
return streamProxyMapper.getOverview();
|
||||
public ResourceBaseInfo getOverview() {
|
||||
|
||||
int total = streamProxyMapper.getAllCount();
|
||||
int online = streamProxyMapper.getOnline();
|
||||
|
||||
return new ResourceBaseInfo(total, online);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.dao.*;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.github.pagehelper.PageHelper;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import org.slf4j.Logger;
|
||||
@@ -183,6 +183,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
|
||||
@Override
|
||||
public boolean stop(String app, String streamId) {
|
||||
logger.info("[推流 ] 停止流: {}/{}", app, streamId);
|
||||
StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
|
||||
if (streamPushItem != null) {
|
||||
gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
|
||||
@@ -531,7 +532,10 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceBaceInfo getOverview() {
|
||||
return streamPushMapper.getOverview(userSetting.isUsePushingAsStatus());
|
||||
public ResourceBaseInfo getOverview() {
|
||||
int total = streamPushMapper.getAllCount();
|
||||
int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus());
|
||||
|
||||
return new ResourceBaseInfo(total, online);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
|
||||
import com.google.common.collect.BiMap;
|
||||
import com.google.common.collect.HashBiMap;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -33,38 +32,43 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|
||||
/**
|
||||
* 用于存储不加过滤的所有数据
|
||||
*/
|
||||
private List<StreamPushItem> streamPushItems = new ArrayList<>();
|
||||
private final List<StreamPushItem> streamPushItems = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
|
||||
*/
|
||||
private Map<String,StreamPushItem> streamPushItemForSave = new HashMap<>();
|
||||
private final Map<String,StreamPushItem> streamPushItemForSave = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 用于存储按照APP+Stream为KEY, 平台ID+目录Id 为value的数据,用于存储到gb_stream表后获取app+Stream对应的平台与目录信息,然后存入关联表
|
||||
*/
|
||||
private Map<String, List<String[]>> streamPushItemsForPlatform = new HashMap<>();
|
||||
private final Map<String, List<String[]>> streamPushItemsForPlatform = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 用于判断文件是否存在重复的app+Stream+平台ID
|
||||
*/
|
||||
private Set<String> streamPushStreamSet = new HashSet<>();
|
||||
private final Set<String> streamPushStreamSet = new HashSet<>();
|
||||
|
||||
/**
|
||||
* 用于存储APP+Stream->国标ID 的数据结构, 数据一一对应,全局判断APP+Stream->国标ID是否存在不对应
|
||||
*/
|
||||
private BiMap<String,String> gBMap = HashBiMap.create();
|
||||
private final BiMap<String,String> gBMap = HashBiMap.create();
|
||||
|
||||
/**
|
||||
* 用于存储APP+Stream-> 在数据库中的数据
|
||||
*/
|
||||
private final BiMap<String,String> pushMapInDb = HashBiMap.create();
|
||||
|
||||
/**
|
||||
* 记录错误的APP+Stream
|
||||
*/
|
||||
private List<String> errorStreamList = new ArrayList<>();
|
||||
private final List<String> errorStreamList = new ArrayList<>();
|
||||
|
||||
|
||||
/**
|
||||
* 记录错误的国标ID
|
||||
*/
|
||||
private List<String> errorGBList = new ArrayList<>();
|
||||
private final List<String> errorInfoList = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 读取数量计数器
|
||||
@@ -75,6 +79,13 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|
||||
this.pushService = pushService;
|
||||
this.defaultMediaServerId = defaultMediaServerId;
|
||||
this.errorDataHandler = errorDataHandler;
|
||||
// 获取数据库已有的数据,已经存在的则忽略
|
||||
List<String> allAppAndStreams = pushService.getAllAppAndStream();
|
||||
if (allAppAndStreams.size() > 0) {
|
||||
for (String allAppAndStream : allAppAndStreams) {
|
||||
pushMapInDb.put(allAppAndStream, allAppAndStream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface ErrorDataHandler{
|
||||
@@ -88,26 +99,30 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|
||||
|| ObjectUtils.isEmpty(streamPushExcelDto.getGbId())) {
|
||||
return;
|
||||
}
|
||||
Integer rowIndex = analysisContext.readRowHolder().getRowIndex();
|
||||
|
||||
if (gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()) == null) {
|
||||
try {
|
||||
gBMap.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto.getGbId());
|
||||
}catch (IllegalArgumentException e) {
|
||||
errorGBList.add(streamPushExcelDto.getGbId() + "(不同的app+stream使用了相同的国标ID)");
|
||||
errorInfoList.add("行:" + rowIndex + ", " + streamPushExcelDto.getGbId() + " 国标ID重复使用");
|
||||
return;
|
||||
}
|
||||
}else {
|
||||
if (!gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()).equals(streamPushExcelDto.getGbId())) {
|
||||
errorGBList.add(streamPushExcelDto.getGbId() + "(同一组app+stream使用了不同的国标ID)");
|
||||
errorInfoList.add("行:" + rowIndex + ", " + streamPushExcelDto.getGbId() + " 同样的应用名和流ID使用了不同的国标ID");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
|
||||
errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ "/" +
|
||||
streamPushExcelDto.getPlatformId() + "(同一组app+stream添加在了同一个平台下)");
|
||||
errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 平台信息重复");
|
||||
return;
|
||||
}else {
|
||||
if (pushMapInDb.get(streamPushExcelDto.getApp()+streamPushExcelDto.getStream()) != null) {
|
||||
errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 数据已存在");
|
||||
return;
|
||||
}
|
||||
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
|
||||
}
|
||||
|
||||
@@ -165,7 +180,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|
||||
gBMap.clear();
|
||||
streamPushStreamSet.clear();
|
||||
streamPushItemsForPlatform.clear();
|
||||
errorDataHandler.handle(errorStreamList, errorGBList);
|
||||
errorDataHandler.handle(errorStreamList, errorInfoList);
|
||||
}
|
||||
|
||||
private void saveData(){
|
||||
|
||||
@@ -7,8 +7,7 @@ import com.github.pagehelper.PageHelper;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.util.DigestUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -61,11 +60,22 @@ public class UserServiceImpl implements IUserService {
|
||||
|
||||
@Override
|
||||
public boolean checkPushAuthority(String callId, String sign) {
|
||||
if (ObjectUtils.isEmpty(callId)) {
|
||||
return userMapper.checkPushAuthorityByCallId(sign).size() > 0;
|
||||
}else {
|
||||
return userMapper.checkPushAuthorityByCallIdAndSign(callId, sign).size() > 0;
|
||||
|
||||
List<User> users = userMapper.getUsers();
|
||||
if (users.size() == 0) {
|
||||
return false;
|
||||
}
|
||||
for (User user : users) {
|
||||
if (user.getPushKey() == null) {
|
||||
continue;
|
||||
}
|
||||
String checkStr = callId == null? user.getPushKey():(callId + "_" + user.getPushKey()) ;
|
||||
String checkSign = DigestUtils.md5DigestAsHex(checkStr.getBytes());
|
||||
if (checkSign.equals(sign)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.genersoft.iot.vmp.service.redisMsg;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.service.IStreamPushService;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
/**
|
||||
* 接收来自redis的关闭流更新通知
|
||||
* @author lin
|
||||
*/
|
||||
@Component
|
||||
public class RedisCloseStreamMsgListener implements MessageListener {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(RedisCloseStreamMsgListener.class);
|
||||
|
||||
|
||||
@Autowired
|
||||
private IStreamPushService pushService;
|
||||
|
||||
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
|
||||
@Override
|
||||
public void onMessage(@NotNull Message message, byte[] bytes) {
|
||||
boolean isEmpty = taskQueue.isEmpty();
|
||||
taskQueue.offer(message);
|
||||
if (isEmpty) {
|
||||
taskExecutor.execute(() -> {
|
||||
while (!taskQueue.isEmpty()) {
|
||||
Message msg = taskQueue.poll();
|
||||
try {
|
||||
JSONObject jsonObject = JSON.parseObject(msg.getBody());
|
||||
String app = jsonObject.getString("app");
|
||||
String stream = jsonObject.getString("stream");
|
||||
pushService.stop(app, stream);
|
||||
|
||||
}catch (Exception e) {
|
||||
logger.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
logger.error("[REDIS的关闭推流通知] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -289,7 +289,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||
// 添加订阅
|
||||
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId());
|
||||
|
||||
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
|
||||
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{
|
||||
dynamicTask.stop(taskKey);
|
||||
responseSendItem(mediaServerItem, content, toId, serial);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user