diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java index ab90ac686..836da67d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java @@ -6,53 +6,32 @@ import com.genersoft.iot.vmp.service.redisMsg.*; import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; +import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.listener.PatternTopic; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** - * @description:Redis中间件配置类,使用spring-data-redis集成,自动从application.yml中加载redis配置 - * @author: swwheihei - * @date: 2019年5月30日 上午10:58:25 + * Redis中间件配置类,使用spring-data-redis集成,自动从application.yml中加载redis配置 + * swwheihei + * 2019年5月30日 上午10:58:25 * */ @Configuration @Order(value=1) -public class RedisConfig extends CachingConfigurerSupport { +public class RedisConfig { - @Autowired - private RedisGpsMsgListener redisGPSMsgListener; - - @Autowired - private RedisAlarmMsgListener redisAlarmMsgListener; - - @Autowired - private RedisStreamMsgListener redisStreamMsgListener; - - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; - - @Autowired - private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; - - @Autowired - private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; - - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate<>(); // 使用fastJson序列化 - FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class); + GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); // value值的序列化采用fastJsonRedisSerializer redisTemplate.setValueSerializer(fastJsonRedisSerializer); redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); @@ -63,27 +42,4 @@ public class RedisConfig extends CachingConfigurerSupport { redisTemplate.setConnectionFactory(redisConnectionFactory); return redisTemplate; } - - - /** - * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 - * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 - * - * @param connectionFactory - * @return - */ - @Bean - RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { - - RedisMessageListenerContainer container = new RedisMessageListenerContainer(); - container.setConnectionFactory(connectionFactory); - container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); - container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); - container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); - container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); - container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); - container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); - container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); - return container; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java new file mode 100644 index 000000000..9f484266b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -0,0 +1,68 @@ +package com.genersoft.iot.vmp.conf.redis; + + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.service.redisMsg.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + + +/** + * @description:Redis中间件配置类,使用spring-data-redis集成,自动从application.yml中加载redis配置 + * @author: swwheihei + * @date: 2019年5月30日 上午10:58:25 + * + */ +@Configuration +@Order(value=1) +public class RedisMsgListenConfig { + + @Autowired + private RedisGpsMsgListener redisGPSMsgListener; + + @Autowired + private RedisAlarmMsgListener redisAlarmMsgListener; + + @Autowired + private RedisStreamMsgListener redisStreamMsgListener; + + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + + @Autowired + private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; + + @Autowired + private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; + + + /** + * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 + * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 + * + * @param connectionFactory + * @return + */ + @Bean + RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); + container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); + container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); + container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); + container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); + container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); + container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); + return container; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java index a905e4424..96ae6b91c 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java @@ -72,22 +72,23 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { **/ @Override public void configure(WebSecurity web) { - - ArrayList matchers = new ArrayList<>(); - matchers.add("/"); - matchers.add("/#/**"); - matchers.add("/static/**"); - matchers.add("/index.html"); - matchers.add("/doc.html"); - matchers.add("/webjars/**"); - matchers.add("/swagger-resources/**"); - matchers.add("/v3/api-docs/**"); - matchers.add("/js/**"); - matchers.add("/api/device/query/snap/**"); - matchers.add("/record_proxy/*/**"); - matchers.addAll(userSetting.getInterfaceAuthenticationExcludes()); - // 可以直接访问的静态数据 - web.ignoring().antMatchers(matchers.toArray(new String[0])); + if (userSetting.isInterfaceAuthentication()) { + ArrayList matchers = new ArrayList<>(); + matchers.add("/"); + matchers.add("/#/**"); + matchers.add("/static/**"); + matchers.add("/index.html"); + matchers.add("/doc.html"); + matchers.add("/webjars/**"); + matchers.add("/swagger-resources/**"); + matchers.add("/v3/api-docs/**"); + matchers.add("/js/**"); + matchers.add("/api/device/query/snap/**"); + matchers.add("/record_proxy/*/**"); + matchers.addAll(userSetting.getInterfaceAuthenticationExcludes()); + // 可以直接访问的静态数据 + web.ignoring().antMatchers(matchers.toArray(new String[0])); + } } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index fad84a6b8..e856faf66 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.message.SIPResponse; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -15,9 +16,7 @@ import java.util.ArrayList; import java.util.List; /** - * @description:视频流session管理器,管理视频预览、预览回放的通信句柄 - * @author: swwheihei - * @date: 2020年5月13日 下午4:03:02 + * 视频流session管理器,管理视频预览、预览回放的通信句柄 */ @Component public class VideoStreamSessionManager { @@ -25,6 +24,9 @@ public class VideoStreamSessionManager { @Autowired private UserSetting userSetting; + @Autowired + private RedisTemplate redisTemplate; + public enum SessionType { play, playback, @@ -54,7 +56,7 @@ public class VideoStreamSessionManager { ssrcTransaction.setMediaServerId(mediaServerId); ssrcTransaction.setType(type); - RedisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + redisTemplate.opsForValue().set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction); } @@ -73,11 +75,11 @@ public class VideoStreamSessionManager { stream ="*"; } String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream; - List scanResult = RedisUtil.scan(key); + List scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() == 0) { return null; } - return (SsrcTransaction)RedisUtil.get((String) scanResult.get(0)); + return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0)); } public List getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){ @@ -94,13 +96,13 @@ public class VideoStreamSessionManager { stream ="*"; } String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream; - List scanResult = RedisUtil.scan(key); + List scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() == 0) { return null; } List result = new ArrayList<>(); for (Object keyObj : scanResult) { - result.add((SsrcTransaction)RedisUtil.get((String) keyObj)); + result.add((SsrcTransaction)redisTemplate.opsForValue().get(keyObj)); } return result; } @@ -126,17 +128,17 @@ public class VideoStreamSessionManager { if (ssrcTransaction == null) { return; } - RedisUtil.del(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_" + redisTemplate.delete(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_" + deviceId + "_" + channelId + "_" + ssrcTransaction.getCallId() + "_" + ssrcTransaction.getStream()); } public List getAllSsrc() { - List ssrcTransactionKeys = RedisUtil.scan(String.format("%s_*_*_*_*", VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX+ userSetting.getServerId())); + List ssrcTransactionKeys = RedisUtil.scan(redisTemplate, String.format("%s_*_*_*_*", VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX+ userSetting.getServerId())); List result= new ArrayList<>(); - for (int i = 0; i < ssrcTransactionKeys.size(); i++) { - String key = (String)ssrcTransactionKeys.get(i); - SsrcTransaction ssrcTransaction = JsonUtil.redisJsonToObject(key, SsrcTransaction.class); + for (Object ssrcTransactionKey : ssrcTransactionKeys) { + String key = (String) ssrcTransactionKey; + SsrcTransaction ssrcTransaction = JsonUtil.redisJsonToObject(redisTemplate, key, SsrcTransaction.class); result.add(ssrcTransaction); } return result; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 8f4bb2620..f06abbaa8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -25,7 +25,7 @@ public interface ISIPCommanderForPlatform { void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; - void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException; + void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean isRegister) throws SipException, InvalidArgumentException, ParseException; /** * 向上级平台注销 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index dd40b39f1..315ddecdf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -46,7 +46,7 @@ public class SIPRequestHeaderPlarformProvider { @Autowired private IRedisCatchStorage redisCatchStorage; - public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String toTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException { + public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String toTag, CallIdHeader callIdHeader, int expires) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; String sipAddress = parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort(); //请求行 @@ -79,8 +79,8 @@ public class SIPRequestHeaderPlarformProvider { .createSipURI(parentPlatform.getDeviceGBId(), sipAddress)); request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); - ExpiresHeader expires = sipLayer.getSipFactory().createHeaderFactory().createExpiresHeader(isRegister ? parentPlatform.getExpires() : 0); - request.addHeader(expires); + ExpiresHeader expiresHeader = sipLayer.getSipFactory().createHeaderFactory().createExpiresHeader(expires); + request.addHeader(expiresHeader); request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil)); @@ -88,10 +88,10 @@ public class SIPRequestHeaderPlarformProvider { } public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String toTag, - WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException { + WWWAuthenticateHeader www , CallIdHeader callIdHeader, int expires) throws ParseException, PeerUnavailableException, InvalidArgumentException { - Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, toTag, callIdHeader, isRegister); + Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, toTag, callIdHeader, expires); SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); if (www == null) { AuthorizationHeader authorizationHeader = sipLayer.getSipFactory().createHeaderFactory().createAuthorizationHeader("Digest"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 452da8eea..630d7daeb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -91,23 +91,23 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { - register(parentPlatform, null, null, errorEvent, okEvent, false, true); + register(parentPlatform, null, null, errorEvent, okEvent, true); } @Override public void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { - register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, true); + register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, true); } @Override public void unregister(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { - register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, false); + register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false); } @Override public void register(ParentPlatform parentPlatform, @Nullable SipTransactionInfo sipTransactionInfo, @Nullable WWWAuthenticateHeader www, - SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException { + SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean isRegister) throws SipException, InvalidArgumentException, ParseException { Request request; CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); @@ -125,10 +125,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } } - if (!registerAgain ) { + if (www == null ) { request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, - toTag, callIdHeader, isRegister); + toTag, callIdHeader, isRegister? parentPlatform.getExpires() : 0); // 将 callid 写入缓存, 等注册成功可以更新状态 String callIdFromHeader = callIdHeader.getCallId(); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister)); @@ -146,7 +146,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { }); }else { - request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister); + request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0); } sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent); @@ -518,10 +518,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent, SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent ) throws SipException, ParseException, InvalidArgumentException { - MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipLayer.getSipFactory().createMessageFactory(); + MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipLayer.getSipFactory().createMessageFactory(); String characterSet = parentPlatform.getCharacterSet(); - // 设置编码, 防止中文乱码 - messageFactory.setDefaultContentEncodingCharset(characterSet); + // 设置编码, 防止中文乱码 + messageFactory.setDefaultContentEncodingCharset(characterSet); SIPRequest notifyRequest = headerProviderPlatformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, subscribeInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 128897402..a5e27c3b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -152,26 +152,30 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); + Element deviceIdElement = rootElement.element("DeviceID"); String channelId = deviceIdElement.getTextTrim().toString(); Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { - // 根据通道id查询设备Id - List deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - }else { - logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); - return; + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 根据通道id查询设备Id + List deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (deviceList.size() > 0) { + device = deviceList.get(0); + } } } - if (device != null) { - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } + if (device == null) { + logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); + return; } - mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID")); + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + + mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(channelId); String time = XmlUtil.getText(rootElement, "Time"); mobilePosition.setTime(time); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 9033963da..3fee5e539 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -84,7 +84,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen try { RequestEventExt evtExt = (RequestEventExt) evt; String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort(); - logger.info("[注册请求] 开始处理: {}", requestAddress); + // MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); // QueryExp protocol = Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")); //// ObjectName name = new ObjectName("*:type=Connector,*"); @@ -107,6 +107,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); String deviceId = uri.getUser(); + logger.info("[注册请求] 设备:{}, 开始处理: {}", deviceId, requestAddress); Device device = deviceService.getDevice(deviceId); RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, @@ -115,7 +116,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen if (device != null && device.getSipTransactionInfo() != null && request.getCallIdHeader().getCallId().equals(device.getSipTransactionInfo().getCallId())) { - logger.info("[注册请求] 注册续订: {}", device.getDeviceId()); + logger.info("[注册请求] 设备:{}, 注册续订: {}",device.getDeviceId(), device.getDeviceId()); device.setExpires(request.getExpires().getExpires()); device.setIp(remoteAddressInfo.getIp()); device.setPort(remoteAddressInfo.getPort()); @@ -135,7 +136,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword(); AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); if (authHead == null && !ObjectUtils.isEmpty(password)) { - logger.info("[注册请求] 回复401: {}", requestAddress); + logger.info("[注册请求] 设备:{}, 回复401: {}",deviceId, requestAddress); response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain()); sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); @@ -150,7 +151,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen // 注册失败 response = getMessageFactory().createResponse(Response.FORBIDDEN, request); response.setReasonPhrase("wrong password"); - logger.info("[注册请求] 密码/SIP服务器ID错误, 回复403: {}", requestAddress); + logger.info("[注册请求] 设备:{}, 密码/SIP服务器ID错误, 回复403: {}", deviceId, requestAddress); sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); return; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index a667c67d8..36e5df299 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.UJson; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; import org.slf4j.Logger; @@ -18,6 +17,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -29,6 +29,7 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -57,6 +58,9 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Autowired private ThreadPoolTaskExecutor taskExecutor; + @Autowired + private RedisTemplate redisTemplate; + private Long recordInfoTtl = 1800L; @Override @@ -130,10 +134,11 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); // 获取任务结果数据 String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; - RedisUtil.hmset(resKey, map, recordInfoTtl); + redisTemplate.opsForHash().putAll(resKey, map); + redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; - long incr = RedisUtil.incr(resCountKey, map.size()); - RedisUtil.expire(resCountKey, recordInfoTtl); + long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); + redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); recordInfo.setRecordList(recordList); recordInfo.setCount(Math.toIntExact(incr)); eventPublisher.recordEndEventPush(recordInfo); @@ -141,7 +146,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent return; } // 已接收完成 - List resList = RedisUtil.hmget(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); + List resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); if (resList.size() < sumNum) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index 0294ba27a..ff1ccc603 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -92,7 +92,7 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response); try { - sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, true, platformRegisterInfo.isRegister()); + sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, platformRegisterInfo.isRegister()); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index dc495f9a3..c38c3134a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; @@ -33,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; @@ -75,13 +75,11 @@ public class MediaServerServiceImpl implements IMediaServerService { private MediaServerMapper mediaServerMapper; @Autowired - DataSourceTransactionManager dataSourceTransactionManager; + private DataSourceTransactionManager dataSourceTransactionManager; @Autowired - TransactionDefinition transactionDefinition; + private TransactionDefinition transactionDefinition; - @Autowired - private VideoStreamSessionManager streamSession; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @@ -95,6 +93,9 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private RedisTemplate redisTemplate; + /** * 初始化 */ @@ -109,12 +110,13 @@ public class MediaServerServiceImpl implements IMediaServerService { if (mediaServerItem.getSsrcConfig() == null) { SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()); mediaServerItem.setSsrcConfig(ssrcConfig); - RedisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem); + redisTemplate.opsForValue().set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem); } // 查询redis是否存在此mediaServer String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); - if (!RedisUtil.hasKey(key)) { - RedisUtil.set(key, mediaServerItem); + Boolean hasKey = redisTemplate.hasKey(key); + if (hasKey != null && ! hasKey) { + redisTemplate.opsForValue().set(key, mediaServerItem); } } @@ -160,7 +162,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } else { rtpServerPort = mediaServerItem.getRtpProxyPort(); } - RedisUtil.set(key, mediaServerItem); + redisTemplate.opsForValue().set(key, mediaServerItem); return new SSRCInfo(rtpServerPort, ssrc, streamId); } } @@ -194,7 +196,7 @@ public class MediaServerServiceImpl implements IMediaServerService { ssrcConfig.releaseSsrc(ssrc); mediaServerItem.setSsrcConfig(ssrcConfig); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); - RedisUtil.set(key, mediaServerItem); + redisTemplate.opsForValue().set(key, mediaServerItem); } /** @@ -203,7 +205,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public void clearRTPServer(MediaServerItem mediaServerItem) { mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain())); - RedisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), mediaServerItem.getId(), 0); + redisTemplate.opsForZSet().add(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), mediaServerItem.getId(), 0); } @@ -225,22 +227,22 @@ public class MediaServerServiceImpl implements IMediaServerService { ); } String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId(); - RedisUtil.set(key, mediaServerItemInDataBase); + redisTemplate.opsForValue().set(key, mediaServerItemInDataBase); } @Override public List getAll() { List result = new ArrayList<>(); - List mediaServerKeys = RedisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" )); + List mediaServerKeys = RedisUtil.scan(redisTemplate, String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" )); String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); for (Object mediaServerKey : mediaServerKeys) { String key = (String) mediaServerKey; - MediaServerItem mediaServerItem = JsonUtil.redisJsonToObject(key, MediaServerItem.class); + MediaServerItem mediaServerItem = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServerItem.class); if (Objects.isNull(mediaServerItem)) { continue; } // 检查状态 - Double aDouble = RedisUtil.zScore(onlineKey, mediaServerItem.getId()); + Double aDouble = redisTemplate.opsForZSet().score(onlineKey, mediaServerItem.getId()); if (aDouble != null) { mediaServerItem.setStatus(true); } @@ -266,13 +268,14 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public List getAllOnline() { String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); - Set mediaServerIdSet = RedisUtil.zRevRange(key, 0, -1); + Set mediaServerIdSet = redisTemplate.opsForZSet().reverseRange(key, 0, -1); List result = new ArrayList<>(); if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) { - for (String mediaServerId : mediaServerIdSet) { - String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; - result.add((MediaServerItem) RedisUtil.get(serverKey)); + for (Object mediaServerId : mediaServerIdSet) { + String mediaServerIdStr = (String) mediaServerId; + String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerIdStr; + result.add((MediaServerItem) redisTemplate.opsForValue().get(serverKey)); } } Collections.reverse(result); @@ -290,7 +293,7 @@ public class MediaServerServiceImpl implements IMediaServerService { return null; } String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; - return JsonUtil.redisJsonToObject(key, MediaServerItem.class); + return JsonUtil.redisJsonToObject(redisTemplate, key, MediaServerItem.class); } @@ -303,7 +306,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public void clearMediaServerForOnline() { String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); - RedisUtil.del(key); + redisTemplate.delete(key); } @Override @@ -403,16 +406,16 @@ public class MediaServerServiceImpl implements IMediaServerService { } mediaServerMapper.update(serverItem); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId(); - if (RedisUtil.get(key) == null) { + if (redisTemplate.opsForValue().get(key) == null) { SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain()); serverItem.setSsrcConfig(ssrcConfig); }else { - MediaServerItem mediaServerItemInRedis = JsonUtil.redisJsonToObject(key, MediaServerItem.class); + MediaServerItem mediaServerItemInRedis = JsonUtil.redisJsonToObject(redisTemplate, key, MediaServerItem.class); if (Objects.nonNull(mediaServerItemInRedis)) { serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig()); } } - RedisUtil.set(key, serverItem); + redisTemplate.opsForValue().set(key, serverItem); resetOnlineServerItem(serverItem); @@ -475,15 +478,15 @@ public class MediaServerServiceImpl implements IMediaServerService { // 更新缓存 String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); // 使用zset的分数作为当前并发量, 默认值设置为0 - if (RedisUtil.zScore(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置 - RedisUtil.zAdd(key, serverItem.getId(), 0L); + if (redisTemplate.opsForZSet().score(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置 + redisTemplate.opsForZSet().add(key, serverItem.getId(), 0L); // 查询服务流数量 zlmresTfulUtils.getMediaList(serverItem, null, null, "rtsp",(mediaList ->{ Integer code = mediaList.getInteger("code"); if (code == 0) { JSONArray data = mediaList.getJSONArray("data"); if (data != null) { - RedisUtil.zAdd(key, serverItem.getId(), data.size()); + redisTemplate.opsForZSet().add(key, serverItem.getId(), data.size()); } } })); @@ -499,14 +502,14 @@ public class MediaServerServiceImpl implements IMediaServerService { return; } String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); - RedisUtil.zIncrScore(key, mediaServerId, 1); + redisTemplate.opsForZSet().incrementScore(key, mediaServerId, 1); } @Override public void removeCount(String mediaServerId) { String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); - RedisUtil.zIncrScore(key, mediaServerId, - 1); + redisTemplate.opsForZSet().incrementScore(key, mediaServerId, - 1); } /** @@ -516,16 +519,14 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public MediaServerItem getMediaServerForMinimumLoad(Boolean hasAssist) { String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); - - if (RedisUtil.zSize(key) == null || RedisUtil.zSize(key) == 0) { - if (RedisUtil.zSize(key) == null || RedisUtil.zSize(key) == 0) { - logger.info("获取负载最低的节点时无在线节点"); - return null; - } + Long size = redisTemplate.opsForZSet().zCard(key); + if (size == null || size == 0) { + logger.info("获取负载最低的节点时无在线节点"); + return null; } // 获取分数最低的,及并发最低的 - Set objects = RedisUtil.zRange(key, 0, -1); + Set objects = redisTemplate.opsForZSet().range(key, 0, -1); ArrayList mediaServerObjectS = new ArrayList<>(objects); MediaServerItem mediaServerItem = null; if (hasAssist == null) { @@ -688,9 +689,9 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public void delete(String id) { - RedisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id); + redisTemplate.opsForZSet().remove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + id; - RedisUtil.del(key); + redisTemplate.delete(key); } @Override public void deleteDb(String id){ @@ -713,7 +714,7 @@ public class MediaServerServiceImpl implements IMediaServerService { SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()); mediaServerItem.setSsrcConfig(ssrcConfig); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); - RedisUtil.set(key, mediaServerItem); + redisTemplate.opsForValue().set(key, mediaServerItem); clearRTPServer(mediaServerItem); } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 27185139a..ffe272bed 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -168,7 +168,7 @@ public class PlatformServiceImpl implements IPlatformService { // 注销旧的 try { if (parentPlatformOld.isStatus()) { - logger.info("保存平台{}时发现救平台在线,发送注销命令", parentPlatform.getDeviceGBId()); + logger.info("保存平台{}时发现救平台在线,发送注销命令", parentPlatformOld.getServerGBId()); commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> { logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId()); }); @@ -275,7 +275,7 @@ public class PlatformServiceImpl implements IPlatformService { // 心跳成功 // 清空之前的心跳超时计数 ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); - if (platformCatch.getKeepAliveReply() > 0) { + if (platformCatch != null && platformCatch.getKeepAliveReply() > 0) { platformCatch.setKeepAliveReply(0); redisCatchStorage.updatePlatformCatchInfo(platformCatch); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 4d8ed2798..a67b951fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; @@ -131,6 +132,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ZlmHttpHookSubscribe hookSubscribe; + @Autowired + private RedisTemplate redisTemplate; + @Override public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, @@ -1193,7 +1197,7 @@ public class PlayServiceImpl implements IPlayService { throw new ServiceException("streamId不存在"); } streamInfo.setPause(true); - RedisUtil.set(key, streamInfo); + redisTemplate.opsForValue().set(key, streamInfo); MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId()); if (null == mediaServerItem) { logger.warn("mediaServer 不存在!"); @@ -1217,7 +1221,7 @@ public class PlayServiceImpl implements IPlayService { throw new ServiceException("streamId不存在"); } streamInfo.setPause(false); - RedisUtil.set(key, streamInfo); + redisTemplate.opsForValue().set(key, streamInfo); MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId()); if (null == mediaServerItem) { logger.warn("mediaServer 不存在!"); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index c37264785..868e861d8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -2,19 +2,17 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -68,7 +67,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @Autowired - private ZLMMediaListManager zlmMediaListManager; + private RedisTemplate redisTemplate; @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @@ -76,14 +75,10 @@ public class RedisGbPlayMsgListener implements MessageListener { @Autowired private IMediaServerService mediaServerService; - @Autowired - private IRedisCatchStorage redisCatchStorage; @Autowired private DynamicTask dynamicTask; - @Autowired - private ZLMMediaListManager mediaListManager; @Autowired private ZlmHttpHookSubscribe subscribe; @@ -246,7 +241,7 @@ public class RedisGbPlayMsgListener implements MessageListener { WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** @@ -265,7 +260,7 @@ public class RedisGbPlayMsgListener implements MessageListener { WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); return; } // 确定流是否在线 @@ -288,7 +283,7 @@ public class RedisGbPlayMsgListener implements MessageListener { userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); }, userSetting.getPlatformPlayTimeout()); // 添加订阅 @@ -302,7 +297,12 @@ public class RedisGbPlayMsgListener implements MessageListener { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), content.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; + logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream()); + redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); + +// redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); } } @@ -327,7 +327,7 @@ public class RedisGbPlayMsgListener implements MessageListener { userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result ); JSONObject jsonObject = (JSONObject)JSON.toJSON(response); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** @@ -364,7 +364,7 @@ public class RedisGbPlayMsgListener implements MessageListener { wvpResult.setMsg("timeout"); errorCallback.handler(wvpResult); }, userSetting.getPlatformPlayTimeout()); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } /** @@ -389,6 +389,6 @@ public class RedisGbPlayMsgListener implements MessageListener { callbacksForStartSendRtpStream.remove(key); callbacksForError.remove(key); }); - RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index 358836c41..3c77024b5 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -1,15 +1,18 @@ package com.genersoft.iot.vmp.storager.dao; -import java.util.List; - import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; -import org.apache.ibatis.annotations.*; +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Select; + +import java.util.List; @Mapper public interface DeviceMobilePositionMapper { @Insert("INSERT INTO device_mobile_position (deviceId,channelId, deviceName, time, longitude, latitude, altitude, speed, direction, reportSource, longitudeGcj02, latitudeGcj02, longitudeWgs84, latitudeWgs84, createTime) " + - "VALUES (#{deviceId},#{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{longitudeGcj02}, #{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{createTime})") + "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{longitudeGcj02}, #{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{createTime})") int insertNewPosition(MobilePosition mobilePosition); @Select(value = {"