diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 470533b25..33f5f0ca1 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -16,7 +16,7 @@ public class VideoManagerConstants { public static final String ONLINE_MEDIA_SERVERS_PREFIX = "VMP_ONLINE_MEDIA_SERVERS:"; - public static final String DEVICE_PREFIX = "VMP_DEVICE:"; + public static final String DEVICE_PREFIX = "VMP_DEVICE_INFO"; public static final String INVITE_PREFIX = "VMP_INVITE_INFO"; @@ -24,7 +24,7 @@ public class VideoManagerConstants { public static final String PLATFORM_REGISTER_INFO_PREFIX = "VMP_PLATFORM_REGISTER_INFO_"; - public static final String SEND_RTP_INFO = "VMP_SEND_RTP_INFO:"; + public static final String SEND_RTP_PORT = "VM_SEND_RTP_PORT:"; public static final String SEND_RTP_INFO_CALLID = "VMP_SEND_RTP_INFO:CALL_ID:"; public static final String SEND_RTP_INFO_STREAM = "VMP_SEND_RTP_INFO:STREAM:"; public static final String SEND_RTP_INFO_CHANNEL = "VMP_SEND_RTP_INFO:CHANNEL:"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index fd640ab54..37b996878 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -151,9 +151,6 @@ public interface DeviceChannelMapper { " "}) List queryChannelsWithDeviceInfo(@Param("deviceId") String deviceId, @Param("parentChannelId") String parentChannelId, @Param("query") String query, @Param("hasSubChannel") Boolean hasSubChannel, @Param("online") Boolean online, @Param("channelIds") List channelIds); - @Update(value = {"UPDATE wvp_device_channel SET stream_id=null WHERE device_db_id=#{deviceId} AND device_id=#{channelId}"}) - void stopPlay(@Param("deviceId") int deviceId, @Param("channelId") String channelId); - @Update(value = {"UPDATE wvp_device_channel SET stream_id=#{streamId} WHERE id=#{channelId}"}) void startPlay(@Param("channelId") Integer channelId, @Param("streamId") String streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 8355f2a99..7d4c876a9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import org.springframework.beans.factory.annotation.Autowired; @@ -39,15 +40,15 @@ public class EventPublisher { applicationEventPublisher.publishEvent(alarmEvent); } - public void mediaServerOfflineEventPublish(String mediaServerId){ + public void mediaServerOfflineEventPublish(MediaServer mediaServer){ MediaServerOfflineEvent outEvent = new MediaServerOfflineEvent(this); - outEvent.setMediaServerId(mediaServerId); + outEvent.setMediaServer(mediaServer); applicationEventPublisher.publishEvent(outEvent); } - public void mediaServerOnlineEventPublish(String mediaServerId) { + public void mediaServerOnlineEventPublish(MediaServer mediaServer) { MediaServerOnlineEvent outEvent = new MediaServerOnlineEvent(this); - outEvent.setMediaServerId(mediaServerId); + outEvent.setMediaServer(mediaServer); applicationEventPublisher.publishEvent(outEvent); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index 7f8bc0142..aa2ca3ab6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -30,13 +30,13 @@ public interface IPlayService { MediaServer getNewMediaServerItem(Device device); void playBack(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback callback); - void zlmServerOffline(String mediaServerId); + void zlmServerOffline(MediaServer mediaServer); void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); StreamInfo getDownLoadInfo(Device device, DeviceChannel channel, String stream); - void zlmServerOnline(String mediaServerId); + void zlmServerOnline(MediaServer mediaServer); AudioBroadcastResult audioBroadcast(Device device, DeviceChannel deviceChannel, Boolean broadcastMode); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index 925788f77..bc73a265d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -210,9 +210,6 @@ public class InviteStreamServiceImpl implements IInviteStreamService { ":" + inviteInfo.getSsrcInfo().getSsrc(); redisTemplate.opsForHash().delete(key, objectKey); } - if (redisTemplate.opsForHash().size(key) == 0) { - redisTemplate.opsForHash().delete(key); - } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 005fda05f..54e7cc2ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -246,32 +246,6 @@ public class PlatformServiceImpl implements IPlatformService { return false; } - private void unregister(Platform platform) { - // 停止心跳定时 - final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + platform.getServerGBId(); - dynamicTask.stop(keepaliveTaskKey); - // 停止注册定时 - final String registerTaskKey = REGISTER_KEY_PREFIX + platform.getServerGBId(); - dynamicTask.stop(registerTaskKey); - - PlatformCatch platformCatchOld = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId()); - // 注销旧的 - try { - if (platform.isStatus()) { - commanderForPlatform.unregister(platform, platformCatchOld.getSipTransactionInfo(), null, eventResult -> { - log.info("[国标级联] 注销命令发送成功,平台:{}", platform.getServerGBId()); - }); - } - } catch (InvalidArgumentException | ParseException | SipException e) { - log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); - } - } - - private void register(Platform platform) { - - } - - @Override public void online(Platform platform, SipTransactionInfo sipTransactionInfo) { log.info("[国标级联]:{}, 平台上线", platform.getServerGBId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 5ecd4b6e7..819213b31 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -1157,12 +1157,12 @@ public class PlayServiceImpl implements IPlayService { @Override - public void zlmServerOffline(String mediaServerId) { + public void zlmServerOffline(MediaServer mediaServer) { // 处理正在向上推流的上级平台 List sendRtpInfos = sendRtpServerService.queryAll(); if (!sendRtpInfos.isEmpty()) { for (SendRtpInfo sendRtpInfo : sendRtpInfos) { - if (sendRtpInfo.getMediaServerId().equals(mediaServerId) && sendRtpInfo.isSendToPlatform()) { + if (sendRtpInfo.getMediaServerId().equals(mediaServer.getId()) && sendRtpInfo.isSendToPlatform()) { Platform platform = platformService.queryPlatformByServerGBId(sendRtpInfo.getTargetId()); CommonGBChannel channel = channelService.getOne(sendRtpInfo.getChannelId()); try { @@ -1177,7 +1177,7 @@ public class PlayServiceImpl implements IPlayService { List allSsrc = sessionManager.getAll(); if (allSsrc.size() > 0) { for (SsrcTransaction ssrcTransaction : allSsrc) { - if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) { + if (ssrcTransaction.getMediaServerId().equals(mediaServer.getId())) { Device device = deviceService.getDeviceByDeviceId(ssrcTransaction.getDeviceId()); if (device == null) { continue; @@ -1314,7 +1314,22 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void zlmServerOnline(String mediaServerId) { + public void zlmServerOnline(MediaServer mediaServer) { + // 获取 + List inviteInfoList = inviteStreamService.getAllInviteInfo(); + if (inviteInfoList.isEmpty()) { + return; + } + + List rtpServerList = mediaServerService.listRtpServer(mediaServer); + if (rtpServerList.isEmpty()) { + return; + } + for (InviteInfo inviteInfo : inviteInfoList) { + if (!rtpServerList.contains(inviteInfo.getStream())){ + inviteStreamService.removeInviteInfo(inviteInfo); + } + } } @Override @@ -1568,7 +1583,10 @@ public class PlayServiceImpl implements IPlayService { public void stop(InviteSessionType type, Device device, DeviceChannel channel, String stream) { InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream); if (inviteInfo == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到"); + if (type == InviteSessionType.PLAY) { + deviceChannelService.stopPlay(channel.getId()); + } + return; } inviteStreamService.removeInviteInfo(inviteInfo); if (InviteSessionStatus.ok == inviteInfo.getStatus()) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java index 1c6bda3d5..b9f7c08ef 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/MediaServerConfig.java @@ -41,7 +41,7 @@ public class MediaServerConfig implements CommandLineRunner { mediaServerService.update(mediaSerItemInConfig); }else { if (defaultMediaServer != null) { - mediaServerService.delete(defaultMediaServer.getId()); + mediaServerService.delete(defaultMediaServer); } MediaServer mediaServerItem = mediaServerService.getOneFromDatabase(mediaSerItemInConfig.getId()); if (mediaServerItem == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerEventAbstract.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerEventAbstract.java index a9bf76981..8fa9953d3 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerEventAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerEventAbstract.java @@ -1,24 +1,23 @@ package com.genersoft.iot.vmp.media.event.mediaServer; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import lombok.Getter; +import lombok.Setter; import org.springframework.context.ApplicationEvent; + public abstract class MediaServerEventAbstract extends ApplicationEvent { private static final long serialVersionUID = 1L; - private String mediaServerId; + @Getter + @Setter + private MediaServer mediaServer; public MediaServerEventAbstract(Object source) { super(source); } - public String getMediaServerId() { - return mediaServerId; - } - - public void setMediaServerId(String mediaServerId) { - this.mediaServerId = mediaServerId; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java index e30ffea40..2e5a6ea08 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java @@ -25,16 +25,16 @@ public class MediaServerStatusEventListener { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaServerOnlineEvent event) { - log.info("[媒体节点] 上线 ID:" + event.getMediaServerId()); - playService.zlmServerOnline(event.getMediaServerId()); + log.info("[媒体节点] 上线 ID:" + event.getMediaServer().getId()); + playService.zlmServerOnline(event.getMediaServer()); } @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaServerOfflineEvent event) { - log.info("[媒体节点] 离线,ID:" + event.getMediaServerId()); + log.info("[媒体节点] 离线,ID:" + event.getMediaServer().getId()); // 处理ZLM离线 - playService.zlmServerOffline(event.getMediaServerId()); + playService.zlmServerOffline(event.getMediaServer()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index eb3a2cb1e..c5b9d0d6b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -67,4 +67,7 @@ public interface IMediaNodeServerService { StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); void stopProxy(MediaServer mediaServer, String streamKey); + + List listRtpServer(MediaServer mediaServer); + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 3d1f7880c..9dcbe20e9 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -65,7 +65,7 @@ public interface IMediaServerService { boolean checkMediaRecordServer(String ip, int port); - void delete(String id); + void delete(MediaServer mediaServer); MediaServer getDefaultMediaServer(); @@ -158,4 +158,5 @@ public interface IMediaServerService { int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode); + List listRtpServer(MediaServer mediaServer); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 30cd87473..be99b226a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -81,6 +81,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private MediaConfig mediaConfig; + /** * 流到来的处理 */ @@ -216,6 +217,16 @@ public class MediaServerServiceImpl implements IMediaServerService { return rtpServerPort; } + @Override + public List listRtpServer(MediaServer mediaServer) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + log.info("[openRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); + return new ArrayList<>(); + } + return mediaNodeServerService.listRtpServer(mediaServer); + } + @Override public void closeRTPServer(MediaServer mediaServer, String streamId) { if (mediaServer == null) { @@ -561,14 +572,14 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void delete(String id) { - mediaServerMapper.delOne(id); - redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), id); - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + id; + public void delete(MediaServer mediaServer) { + mediaServerMapper.delOne(mediaServer.getId()); + redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), mediaServer.getId()); + String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId(); redisTemplate.delete(key); // 发送节点移除通知 MediaServerDeleteEvent event = new MediaServerDeleteEvent(this); - event.setMediaServerId(id); + event.setMediaServer(mediaServer); applicationEventPublisher.publishEvent(event); } @@ -589,7 +600,7 @@ public class MediaServerServiceImpl implements IMediaServerService { for (MediaServer mediaServer : allInCatch) { // 清除数据中不存在但redis缓存数据 if (!mediaServerMap.containsKey(mediaServer.getId())) { - delete(mediaServer.getId()); + delete(mediaServer); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java deleted file mode 100755 index b756b9ece..000000000 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.genersoft.iot.vmp.media.zlm; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.math.NumberUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.support.atomic.RedisAtomicInteger; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Slf4j -@Component -public class SendRtpPortManager { - - @Autowired - private UserSetting userSetting; - - @Autowired - private RedisTemplate redisTemplate; - - private final String KEY = "VM_MEDIA_SEND_RTP_PORT_"; - - public synchronized int getNextPort(MediaServer mediaServer) { - if (mediaServer == null) { - log.warn("[发送端口管理] 参数错误,mediaServer为NULL"); - return -1; - } - String sendIndexKey = KEY + userSetting.getServerId() + "_" + mediaServer.getId(); - String key = VideoManagerConstants.SEND_RTP_INFO - + userSetting.getServerId() + "_*"; - List queryResult = RedisUtil.scan(redisTemplate, key); - Map sendRtpItemMap = new HashMap<>(); - - for (Object o : queryResult) { - SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(o); - if (sendRtpItem != null) { - sendRtpItemMap.put(sendRtpItem.getLocalPort(), sendRtpItem); - } - } - String sendRtpPortRange = mediaServer.getSendRtpPortRange(); - int startPort; - int endPort; - if (sendRtpPortRange != null) { - String[] portArray = sendRtpPortRange.split(","); - if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) { - log.warn("{}发送端口配置格式错误,自动使用50000-60000作为端口范围", mediaServer.getId()); - startPort = 50000; - endPort = 60000; - }else { - if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) { - log.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用50000-60000作为端口范围", mediaServer.getId()); - startPort = 50000; - endPort = 60000; - }else { - startPort = Integer.parseInt(portArray[0]); - endPort = Integer.parseInt(portArray[1]); - } - } - }else { - log.warn("{}未设置发送端口默认值,自动使用50000-60000作为端口范围", mediaServer.getId()); - startPort = 50000; - endPort = 60000; - } - if (redisTemplate == null || redisTemplate.getConnectionFactory() == null) { - log.warn("{}获取redis连接信息失败", mediaServer.getId()); - return -1; - } -// RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory()); -// return redisAtomicInteger.getAndUpdate((current)->{ -// return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort)); -// }); - return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap); - } - - private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map sendRtpItemMap){ - // TODO 这里改为只取偶数端口 - RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory()); - if (redisAtomicInteger.get() < startPort) { - redisAtomicInteger.set(startPort); - return startPort; - }else { - int port = redisAtomicInteger.getAndIncrement(); - if (port > endPort) { - redisAtomicInteger.set(startPort); - if (sendRtpItemMap.containsKey(startPort)) { - return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap); - }else { - return startPort; - } - } - if (sendRtpItemMap.containsKey(port)) { - return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap); - }else { - return port; - } - } - } - - interface CheckPortCallback{ - boolean check(int port); - } - - private int getPort(int current, int start, int end, CheckPortCallback checkPortCallback) { - if (current <= 0) { - if (start%2 == 0) { - current = start; - }else { - current = start + 1; - } - }else { - current += 2; - if (current > end) { - if (start%2 == 0) { - current = start; - }else { - current = start + 1; - } - } - } - if (!checkPortCallback.check(current)) { - return getPort(current + 2, start, end, checkPortCallback); - } - return current; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 89300e2ed..9e8e6c501 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -20,10 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @Slf4j @Service("zlm") @@ -136,9 +133,14 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { param.put("ssrc", ssrc); } JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param); - log.info("停止发流结果: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); - return true; - + System.out.println(jsonObject); + if (jsonObject.getInteger("code") != null && jsonObject.getInteger("code") == 0) { + log.info("[停止发流] 成功: 参数:{}", JSON.toJSONString(param)); + return true; + }else { + log.info("停止发流结果: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); + return false; + } } @Override @@ -496,4 +498,22 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); } } + + @Override + public List listRtpServer(MediaServer mediaServer) { + JSONObject jsonObject = zlmresTfulUtils.listRtpServer(mediaServer); + List result = new ArrayList<>(); + if (jsonObject == null || jsonObject.getInteger("code") != 0) { + return result; + } + JSONArray data = jsonObject.getJSONArray("data"); + if (data == null || data.isEmpty()) { + return result; + } + for (int i = 0; i < data.size(); i++) { + JSONObject dataJSONObject = data.getJSONObject(i); + result.add(dataJSONObject.getString("stream_id")); + } + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java index 9d0ddca52..029e46ca8 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java @@ -112,13 +112,13 @@ public class ZLMMediaServerStatusManager { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaServerDeleteEvent event) { - if (event.getMediaServerId() == null) { + if (event.getMediaServer() == null) { return; } - log.info("[ZLM-节点被移除] ID:" + event.getMediaServerId()); - offlineZlmPrimaryMap.remove(event.getMediaServerId()); - offlineZlmsecondaryMap.remove(event.getMediaServerId()); - offlineZlmTimeMap.remove(event.getMediaServerId()); + log.info("[ZLM-节点被移除] ID:" + event.getMediaServer().getId()); + offlineZlmPrimaryMap.remove(event.getMediaServer().getId()); + offlineZlmsecondaryMap.remove(event.getMediaServer().getId()); + offlineZlmTimeMap.remove(event.getMediaServer().getId()); } @Scheduled(fixedDelay = 10*1000) //每隔10秒检查一次 @@ -188,7 +188,7 @@ public class ZLMMediaServerStatusManager { mediaServerItem.setHookAliveInterval(10F); mediaServerService.update(mediaServerItem); // 发送上线通知 - eventPublisher.mediaServerOnlineEventPublish(mediaServerItem.getId()); + eventPublisher.mediaServerOnlineEventPublish(mediaServerItem); if(mediaServerItem.isAutoConfig()) { if (config == null) { JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); @@ -213,7 +213,7 @@ public class ZLMMediaServerStatusManager { offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem); offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); // 发送离线通知 - eventPublisher.mediaServerOfflineEventPublish(mediaServerItem.getId()); + eventPublisher.mediaServerOfflineEventPublish(mediaServerItem); mediaServerService.update(mediaServerItem); }, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000)); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index b487f7ab5..fee0ccabc 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import lombok.extern.slf4j.Slf4j; @@ -18,12 +17,6 @@ public class ZLMServerFactory { @Autowired private ZLMRESTfulUtils zlmresTfulUtils; - @Autowired - private UserSetting userSetting; - - @Autowired - private SendRtpPortManager sendRtpPortManager; - /** * 开启rtpServer diff --git a/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java index 8b05d3a4c..ca8c33d3e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java @@ -40,4 +40,6 @@ public interface ISendRtpServerService { List queryByChannelId(int id); void deleteByStream(String stream); + + int getNextPort(MediaServer mediaServer); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 875d01bf0..178edb77e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -8,16 +8,13 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; -import com.genersoft.iot.vmp.gb28181.service.*; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IMediaService; -import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.IUserService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; @@ -56,33 +53,12 @@ public class MediaServiceImpl implements IMediaService { @Autowired private IInviteStreamService inviteStreamService; - @Autowired - private SSRCFactory ssrcFactory; - @Autowired private IDeviceChannelService deviceChannelService; @Autowired private SipInviteSessionManager sessionManager; - @Autowired - private IPlatformService platformService; - - @Autowired - private IGbChannelService channelService; - - @Autowired - private IDeviceService deviceService; - - @Autowired - private ISIPCommanderForPlatform commanderForPlatform; - - @Autowired - private ISIPCommander commander; - - @Autowired - private ISendRtpServerService sendRtpServerService; - @Override public boolean authenticatePlay(String app, String stream, String callId) { if (app == null || stream == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java index 03713ec76..270b5403f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java @@ -4,19 +4,20 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.PlayException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.gb28181.conf.StackLoggerImpl; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.utils.JsonUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.math.NumberUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.support.atomic.RedisAtomicInteger; import org.springframework.stereotype.Service; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; @Service @Slf4j @@ -25,18 +26,14 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { @Autowired private UserSetting userSetting; - @Autowired - private SendRtpPortManager sendRtpPortManager; - @Autowired private RedisTemplate redisTemplate; - @Autowired - private StackLoggerImpl stackLoggerImpl; + @Override public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String requesterId, String deviceId, Integer channelId, Boolean isTcp, Boolean rtcp) { - int localPort = sendRtpPortManager.getNextPort(mediaServer); + int localPort = getNextPort(mediaServer); if (localPort == 0) { return null; } @@ -48,7 +45,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String platformId, String app, String stream, Integer channelId, Boolean tcp, Boolean rtcp){ - int localPort = sendRtpPortManager.getNextPort(mediaServer); + int localPort = getNextPort(mediaServer); if (localPort <= 0) { throw new PlayException(javax.sip.message.Response.SERVER_INTERNAL_ERROR, "server internal error"); } @@ -64,7 +61,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { @Override public void update(SendRtpInfo sendRtpItem) { - redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpItem.getCallId(), sendRtpItem); + redisTemplate.opsForHash().put(VideoManagerConstants.SEND_RTP_INFO_CALLID, sendRtpItem.getCallId(), sendRtpItem); redisTemplate.opsForHash().put(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpItem.getStream(), sendRtpItem.getTargetId(), sendRtpItem); redisTemplate.opsForHash().put(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpItem.getChannelId(), sendRtpItem.getTargetId(), sendRtpItem); } @@ -77,8 +74,8 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { @Override public SendRtpInfo queryByCallId(String callId) { - String key = VideoManagerConstants.SEND_RTP_INFO_CALLID + callId; - return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class); + String key = VideoManagerConstants.SEND_RTP_INFO_CALLID; + return (SendRtpInfo)redisTemplate.opsForHash().get(key, callId); } @Override @@ -107,17 +104,9 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { if (sendRtpInfo == null) { return; } - redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpInfo.getCallId()); - if (redisTemplate.opsForHash().size(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream()) == 0) { - redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream()); - }else { - redisTemplate.opsForHash().delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream(), sendRtpInfo.getTargetId()); - } - if (redisTemplate.opsForHash().size(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId()) == 0) { - redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId()); - }else { - redisTemplate.opsForHash().delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId(), sendRtpInfo.getTargetId()); - } + redisTemplate.opsForHash().delete(VideoManagerConstants.SEND_RTP_INFO_CALLID, sendRtpInfo.getCallId()); + redisTemplate.opsForHash().delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream(), sendRtpInfo.getTargetId()); + redisTemplate.opsForHash().delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId(), sendRtpInfo.getTargetId()); } @Override public void deleteByCallId(String callId) { @@ -166,15 +155,12 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { @Override public List queryAll() { - String key = VideoManagerConstants.SEND_RTP_INFO_CALLID + ":*"; - List queryResult = RedisUtil.scan(redisTemplate, key); + String key = VideoManagerConstants.SEND_RTP_INFO_CALLID; + List values = redisTemplate.opsForHash().values(key); List result= new ArrayList<>(); - - for (Object o : queryResult) { - String keyItem = (String) o; - result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem)); + for (Object o : values) { + result.add((SendRtpInfo) o); } - return result; } @@ -195,4 +181,80 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { } return sendRtpInfos; } + + private Set getAllSendRtpPort() { + String key = VideoManagerConstants.SEND_RTP_INFO_CALLID; + List values = redisTemplate.opsForHash().values(key); + Set result = new HashSet<>(); + for (Object value : values) { + SendRtpInfo sendRtpInfo = (SendRtpInfo) value; + result.add(sendRtpInfo.getPort()); + } + return result; + } + + + @Override + public synchronized int getNextPort(MediaServer mediaServer) { + if (mediaServer == null) { + log.warn("[发送端口管理] 参数错误,mediaServer为NULL"); + return -1; + } + String sendIndexKey = VideoManagerConstants.SEND_RTP_PORT + userSetting.getServerId() + ":" + mediaServer.getId(); + Set sendRtpSet = getAllSendRtpPort(); + String sendRtpPortRange = mediaServer.getSendRtpPortRange(); + int startPort; + int endPort; + if (sendRtpPortRange != null) { + String[] portArray = sendRtpPortRange.split(","); + if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) { + log.warn("{}发送端口配置格式错误,自动使用50000-60000作为端口范围", mediaServer.getId()); + startPort = 50000; + endPort = 60000; + }else { + if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) { + log.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用50000-60000作为端口范围", mediaServer.getId()); + startPort = 50000; + endPort = 60000; + }else { + startPort = Integer.parseInt(portArray[0]); + endPort = Integer.parseInt(portArray[1]); + } + } + }else { + log.warn("{}未设置发送端口默认值,自动使用50000-60000作为端口范围", mediaServer.getId()); + startPort = 50000; + endPort = 60000; + } + if (redisTemplate == null || redisTemplate.getConnectionFactory() == null) { + log.warn("{}获取redis连接信息失败", mediaServer.getId()); + return -1; + } + return getSendPort(startPort, endPort, sendIndexKey, sendRtpSet); + } + + private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Set sendRtpPortSet){ + // TODO 这里改为只取偶数端口 + RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory()); + if (redisAtomicInteger.get() < startPort) { + redisAtomicInteger.set(startPort); + return startPort; + }else { + int port = redisAtomicInteger.getAndIncrement(); + if (port > endPort) { + redisAtomicInteger.set(startPort); + if (sendRtpPortSet.contains(startPort)) { + return getSendPort(startPort, endPort, sendIndexKey, sendRtpPortSet); + }else { + return startPort; + } + } + if (sendRtpPortSet.contains(port)) { + return getSendPort(startPort, endPort, sendIndexKey, sendRtpPortSet); + }else { + return port; + } + } + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index dde2860a5..b74df842f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -10,16 +10,13 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.ISendRtpServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import lombok.extern.slf4j.Slf4j; @@ -41,10 +38,7 @@ public class RedisRpcController { private IMediaServerService mediaServerService; @Autowired - private SendRtpPortManager sendRtpPortManager; - - @Autowired - private IRedisCatchStorage redisCatchStorage; + private ISendRtpServerService sendRtpServerService; @Autowired private UserSetting userSetting; @@ -56,13 +50,6 @@ public class RedisRpcController { private RedisTemplate redisTemplate; - @Autowired - private ISIPCommanderForPlatform commanderFroPlatform; - - @Autowired - private ISendRtpServerService sendRtpServerService; - - /** * 获取发流的信息 */ @@ -83,7 +70,7 @@ public class RedisRpcController { response.setStatusCode(200); } // 自平台内容 - int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + int localPort = sendRtpServerService.getNextPort(mediaServerItem); if (localPort == 0) { log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" ); RedisRpcResponse response = request.getResponse(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 4c47f19df..4d478c44e 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -162,25 +162,25 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void updateDevice(Device device) { - String key = VideoManagerConstants.DEVICE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.DEVICE_PREFIX; redisTemplate.opsForHash().put(key, device.getDeviceId(), device); } @Override public void removeDevice(String deviceId) { - String key = VideoManagerConstants.DEVICE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.DEVICE_PREFIX; redisTemplate.opsForHash().delete(key, deviceId); } @Override public void removeAllDevice() { - String key = VideoManagerConstants.DEVICE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.DEVICE_PREFIX; redisTemplate.delete(key); } @Override public List getAllDevices() { - String key = VideoManagerConstants.DEVICE_PREFIX + userSetting.getServerId(); + String key = VideoManagerConstants.DEVICE_PREFIX; List result = new ArrayList<>(); List values = redisTemplate.opsForHash().values(key); for (Object value : values) { @@ -193,13 +193,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public Device getDevice(String deviceId) { - String key = VideoManagerConstants.DEVICE_PREFIX + userSetting.getServerId(); - Device device = (Device)redisTemplate.opsForHash().get(key, deviceId); - if (device == null){ + String key = VideoManagerConstants.DEVICE_PREFIX; + Device device; + Object object = redisTemplate.opsForHash().get(key, deviceId); + if (object == null){ device = deviceMapper.getDeviceByDeviceId(deviceId); if (device != null) { updateDevice(device); } + }else { + device = (Device)object; } return device; } @@ -418,9 +421,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public int getGbSendCount(String id) { - String key = VideoManagerConstants.SEND_RTP_INFO - + userSetting.getServerId() + "_*_" + id + "_*"; - return RedisUtil.scan(redisTemplate, key).size(); + String key = VideoManagerConstants.SEND_RTP_INFO_CALLID; + return redisTemplate.opsForHash().size(key).intValue(); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java index f612abd94..1d848b64d 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java @@ -64,17 +64,17 @@ public interface IStreamProxyService { /** * 新的节点加入 - * @param mediaServerId + * @param mediaServer * @return */ - void zlmServerOnline(String mediaServerId); + void zlmServerOnline(MediaServer mediaServer); /** * 节点离线 - * @param mediaServerId + * @param mediaServer * @return */ - void zlmServerOffline(String mediaServerId); + void zlmServerOffline(MediaServer mediaServer); /** * 更新代理流 diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 45e08a76a..3721edb1e 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -121,7 +121,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { - zlmServerOnline(event.getMediaServerId()); + zlmServerOnline(event.getMediaServer()); } /** @@ -131,7 +131,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { - zlmServerOffline(event.getMediaServerId()); + zlmServerOffline(event.getMediaServer()); } @@ -284,15 +284,14 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override @Transactional - public void zlmServerOnline(String mediaServerId) { - MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + public void zlmServerOnline(MediaServer mediaServer) { if (mediaServer == null) { return; } // 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本 - redisCatchStorage.removeStream(mediaServerId, "PULL"); + redisCatchStorage.removeStream(mediaServer.getId(), "PULL"); - List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); + List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServer.getId(), true); if (streamProxies.isEmpty()) { return; } @@ -359,11 +358,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override - public void zlmServerOffline(String mediaServerId) { - List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); + public void zlmServerOffline(MediaServer mediaServer) { + List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServer.getId(), true); // 清理redis相关的缓存 - redisCatchStorage.removeStream(mediaServerId, "PULL"); + redisCatchStorage.removeStream(mediaServer.getId(), "PULL"); if (streamProxies.isEmpty()) { return; @@ -395,7 +394,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { jsonObject.put("app", streamProxy.getApp()); jsonObject.put("stream", streamProxy.getStream()); jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); + jsonObject.put("mediaServerId", mediaServer); redisCatchStorage.sendStreamChangeMsg("pull", jsonObject); } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java index 5992b0516..56bde718b 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.streamPush.service; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; @@ -36,12 +37,12 @@ public interface IStreamPushService { /** * 新的节点加入 */ - void zlmServerOnline(String mediaServerId); + void zlmServerOnline(MediaServer mediaServer); /** * 节点离线 */ - void zlmServerOffline(String mediaServerId); + void zlmServerOffline(MediaServer mediaServer); /** * 批量添加 diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index 4c699f413..9af4b033c 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -159,7 +159,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { - zlmServerOnline(event.getMediaServerId()); + zlmServerOnline(event.getMediaServer()); } /** @@ -169,7 +169,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { - zlmServerOffline(event.getMediaServerId()); + zlmServerOffline(event.getMediaServer()); } @Override @@ -310,17 +310,16 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override @Transactional - public void zlmServerOnline(String mediaServerId) { + public void zlmServerOnline(MediaServer mediaServer) { // 同步zlm推流信息 - MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem == null) { + if (mediaServer == null) { return; } // 数据库记录 - List pushList = getPushList(mediaServerId); + List pushList = getPushList(mediaServer.getId()); Map pushItemMap = new HashMap<>(); // redis记录 - List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + List mediaInfoList = redisCatchStorage.getStreams(mediaServer.getId(), "PUSH"); Map streamInfoPushItemMap = new HashMap<>(); if (!pushList.isEmpty()) { for (StreamPush streamPushItem : pushList) { @@ -340,7 +339,7 @@ public class StreamPushServiceImpl implements IStreamPushService { for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) { streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo); } - List mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null); + List mediaList = mediaServerService.getMediaList(mediaServer, null, null, null); if (mediaList == null) { return; } @@ -368,12 +367,12 @@ public class StreamPushServiceImpl implements IStreamPushService { jsonObject.put("app", mediaInfo.getApp()); jsonObject.put("stream", mediaInfo.getStream()); jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); + jsonObject.put("mediaServerId", mediaServer.getId()); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream()); + redisCatchStorage.removeStream(mediaServer.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream()); // 冗余数据,自己系统中自用 - redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId()); + redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServer.getId()); } } @@ -388,8 +387,8 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override @Transactional - public void zlmServerOffline(String mediaServerId) { - List streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId); + public void zlmServerOffline(MediaServer mediaServer) { + List streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServer.getId()); if (!streamPushItems.isEmpty()) { for (StreamPush streamPushItem : streamPushItems) { stop(streamPushItem); @@ -403,21 +402,21 @@ public class StreamPushServiceImpl implements IStreamPushService { // 发送流停止消息 String type = "PUSH"; // 发送redis消息 - List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); + List mediaInfoList = redisCatchStorage.getStreams(mediaServer.getId(), type); if (!mediaInfoList.isEmpty()) { for (MediaInfo mediaInfo : mediaInfoList) { // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); + redisCatchStorage.removeStream(mediaServer.getId(), type, mediaInfo.getApp(), mediaInfo.getStream()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); jsonObject.put("app", mediaInfo.getApp()); jsonObject.put("stream", mediaInfo.getStream()); jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); + jsonObject.put("mediaServerId", mediaServer.getId()); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 冗余数据,自己系统中自用 - redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId); + redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServer.getId()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index 787defa28..efffbf909 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -11,7 +11,7 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -28,9 +28,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.*; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -48,7 +46,7 @@ public class PsController { private IMediaServerService mediaServerService; @Autowired - private SendRtpPortManager sendRtpPortManager; + private ISendRtpServerService sendRtpServerService; @Autowired private UserSetting userSetting; @@ -133,7 +131,7 @@ public class PsController { if (isSend != null && isSend) { String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; // 预创建发流信息 - int port = sendRtpPortManager.getNextPort(mediaServer); + int port = sendRtpServerService.getNextPort(mediaServer); otherPsSendInfo.setSendLocalIp(mediaServer.getSdpIp()); otherPsSendInfo.setSendLocalPort(port); @@ -249,11 +247,6 @@ public class PsController { if (sendInfo == null){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流"); } - Map param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendInfo.getPushApp()); - param.put("stream",sendInfo.getPushStream()); - param.put("ssrc",sendInfo.getPushSSRC()); MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); boolean result = mediaServerService.stopSendRtp(mediaServerItem, sendInfo.getPushApp(), sendInfo.getStream(), sendInfo.getPushSSRC()); if (!result) { @@ -283,6 +276,6 @@ public class PsController { // }).start(); // } - return sendRtpPortManager.getNextPort(defaultMediaServer); + return sendRtpServerService.getNextPort(defaultMediaServer); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index a8ef9df2f..b78e9a67c 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -11,7 +11,7 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -41,7 +41,7 @@ import java.util.concurrent.TimeUnit; public class RtpController { @Autowired - private SendRtpPortManager sendRtpPortManager; + private ISendRtpServerService sendRtpServerService; @Autowired private HookSubscribe hookSubscribe; @@ -130,8 +130,8 @@ public class RtpController { redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); if (isSend != null && isSend) { // 预创建发流信息 - int portForVideo = sendRtpPortManager.getNextPort(mediaServer); - int portForAudio = sendRtpPortManager.getNextPort(mediaServer); + int portForVideo = sendRtpServerService.getNextPort(mediaServer); + int portForAudio = sendRtpServerService.getNextPort(mediaServer); otherRtpSendInfo.setSendLocalIp(mediaServer.getSdpIp()); otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index d7facd239..3c22dbb82 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -139,7 +139,11 @@ public class ServerController { @DeleteMapping(value = "/media_server/delete") @ResponseBody public void deleteMediaServer(@RequestParam String id) { - mediaServerService.delete(id); + MediaServer mediaServer = mediaServerService.getOne(id); + if(mediaServer == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "流媒体不存在"); + } + mediaServerService.delete(mediaServer); }