diff --git a/README.md b/README.md index df71b6c0d..4f09aba7c 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,6 @@ WEB VIDEO PLATFORM是一个基于GB28181-2016标准实现的开箱即用的网 # 文档 wvp使用文档 [https://doc.wvp-pro.cn](https://doc.wvp-pro.cn) ZLM使用文档 [https://github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit) -> wvp文档由gitee提供服务,如果遇到打不开请多刷新几次。 # 付费社群 [![社群](doc/_media/shequ.png "shequ")](https://t.zsxq.com/0d8VAD3Dm) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index 167b9f201..e6cfac3d5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -37,15 +37,14 @@ public class RecordEndEventListener implements ApplicationListener{}, stream->{}", event.getApp(), event.getStream()); addCount(event.getMediaServer().getId()); + String type = OriginType.values()[event.getMediaInfo().getOriginType()].getType(); + redisCatchStorage.addStream(event.getMediaServer(), type, event.getApp(), event.getStream(), event.getMediaInfo()); } } @@ -110,7 +113,15 @@ public class MediaServerServiceImpl implements IMediaServerService { if ("rtsp".equals(event.getSchema())) { logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream()); removeCount(event.getMediaServer().getId()); + MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( + event.getApp(), event.getStream(), event.getMediaServer().getId()); + if (mediaInfo == null) { + return; + } + String type = OriginType.values()[mediaInfo.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream()); } + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java index 797286fe2..f498479d8 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java @@ -35,7 +35,7 @@ public class StreamPushItem extends GbStream implements Comparable onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, type); - if (onStreamChangedHookParams.size() > 0) { - for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { + List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); + if (mediaInfoList.size() > 0) { + for (MediaInfo mediaInfo : mediaInfoList) { JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", onStreamChangedHookParam.getApp()); - jsonObject.put("stream", onStreamChangedHookParam.getStream()); + jsonObject.put("app", mediaInfo.getApp()); + jsonObject.put("stream", mediaInfo.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); + redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); } } } @@ -534,8 +533,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private void syncPullStream(String mediaServerId){ MediaServer mediaServer = mediaServerService.getOne(mediaServerId); if (mediaServer != null) { - List allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL"); - if (!allPullStream.isEmpty()) { + List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PULL"); + if (!mediaInfoList.isEmpty()) { List mediaList = mediaServerService.getMediaList(mediaServer, null, null, null); Map stringStreamInfoMap = new HashMap<>(); if (mediaList != null && !mediaList.isEmpty()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 652bcdd69..7a1494027 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -156,10 +156,10 @@ public class StreamPushServiceImpl implements IStreamPushService { @EventListener public void onApplicationEvent(MediaDepartureEvent event) { // 兼容流注销时类型从redis记录获取 - OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo( + MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( event.getApp(), event.getStream(), event.getMediaServer().getId()); - if (onStreamChangedHookParam != null) { - String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); + if (mediaInfo != null) { + String type = OriginType.values()[mediaInfo.getOriginType()].getType(); redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream()); if ("PUSH".equalsIgnoreCase(type)) { // 冗余数据,自己系统中自用 @@ -217,7 +217,7 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setOriginSock(item.getOriginSock()); - streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + ""); + streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); @@ -302,8 +302,8 @@ public class StreamPushServiceImpl implements IStreamPushService { List pushList = getPushList(mediaServerId); Map pushItemMap = new HashMap<>(); // redis记录 - List onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); - Map streamInfoPushItemMap = new HashMap<>(); + List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); + Map streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { for (StreamPushItem streamPushItem : pushList) { if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { @@ -311,9 +311,9 @@ public class StreamPushServiceImpl implements IStreamPushService { } } } - if (onStreamChangedHookParams.size() > 0) { - for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { - streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); + if (mediaInfoList.size() > 0) { + for (MediaInfo mediaInfo : mediaInfoList) { + streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo); } } // 获取所有推流鉴权信息,清理过期的 @@ -352,21 +352,21 @@ public class StreamPushServiceImpl implements IStreamPushService { } } - Collection offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); - if (offlineOnStreamChangedHookParamList.size() > 0) { + Collection mediaInfos = streamInfoPushItemMap.values(); + if (mediaInfos.size() > 0) { String type = "PUSH"; - for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { + for (MediaInfo mediaInfo : mediaInfos) { JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", offlineOnStreamChangedHookParam.getApp()); - jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream()); + jsonObject.put("app", mediaInfo.getApp()); + jsonObject.put("stream", mediaInfo.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream()); // 冗余数据,自己系统中自用 - redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId()); + redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId()); } } @@ -391,21 +391,21 @@ public class StreamPushServiceImpl implements IStreamPushService { // 发送流停止消息 String type = "PUSH"; // 发送redis消息 - List streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); - if (streamInfoList.size() > 0) { - for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { + List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); + if (mediaInfoList.size() > 0) { + for (MediaInfo mediaInfo : mediaInfoList) { // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); + redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); JSONObject jsonObject = new JSONObject(); jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", onStreamChangedHookParam.getApp()); - jsonObject.put("stream", onStreamChangedHookParam.getStream()); + jsonObject.put("app", mediaInfo.getApp()); + jsonObject.put("stream", mediaInfo.getStream()); jsonObject.put("register", false); jsonObject.put("mediaServerId", mediaServerId); redisCatchStorage.sendStreamChangeMsg(type, jsonObject); // 冗余数据,自己系统中自用 - redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId); + redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java index a12715939..2c3a3fa43 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java @@ -137,7 +137,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener getStreams(String mediaServerId, String pull); + List getStreams(String mediaServerId, String pull); /** * 将device信息写入redis @@ -134,7 +134,7 @@ public interface IRedisCatchStorage { void resetAllSN(); - OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId); + MediaInfo getStreamInfo(String app, String streamId, String mediaServerId); void addCpuInfo(double cpuInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index b17b4d743..51878c7ed 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -9,11 +9,11 @@ import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -315,14 +315,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam onStreamChangedHookParam) { + public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) { // 查找是否使用了callID StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); if (streamAuthorityInfo != null) { - onStreamChangedHookParam.setCallId(streamAuthorityInfo.getCallId()); + mediaInfo.setCallId(streamAuthorityInfo.getCallId()); } - redisTemplate.opsForValue().set(key, onStreamChangedHookParam); + redisTemplate.opsForValue().set(key, mediaInfo); } @Override @@ -341,13 +341,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public List getStreams(String mediaServerId, String type) { - List result = new ArrayList<>(); + public List getStreams(String mediaServerId, String type) { + List result = new ArrayList<>(); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId; List streams = RedisUtil.scan(redisTemplate, key); for (Object stream : streams) { - OnStreamChangedHookParam onStreamChangedHookParam = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(stream); - result.add(onStreamChangedHookParam); + MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream); + result.add(mediaInfo); } return result; } @@ -466,14 +466,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override - public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) { + public MediaInfo getStreamInfo(String app, String streamId, String mediaServerId) { String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId; - OnStreamChangedHookParam result = null; + MediaInfo result = null; List keys = RedisUtil.scan(redisTemplate, scanKey); if (keys.size() > 0) { String key = (String) keys.get(0); - result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class); + result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class); } return result; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 309cd694d..2ea4b8730 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -274,7 +274,7 @@ public class StreamPushController { stream.setStatus(false); stream.setPushIng(false); stream.setAliveSecond(0L); - stream.setTotalReaderCount("0"); + stream.setTotalReaderCount(0); if (!streamPushService.add(stream)) { throw new ControllerException(ErrorCode.ERROR100); }