去除redis工具类直接使用RedisTemplate存取数据

This commit is contained in:
648540858
2023-04-03 10:26:55 +08:00
parent 053cd130aa
commit 4f2d47385d
18 changed files with 1251 additions and 1177 deletions

View File

@@ -8,16 +8,15 @@ import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
/**
* @description:视频流session管理器管理视频预览、预览回放的通信句柄
* @author: swwheihei
* @date: 2020年5月13日 下午4:03:02
/**
* 视频流session管理器管理视频预览、预览回放的通信句柄
*/
@Component
public class VideoStreamSessionManager {
@@ -25,6 +24,9 @@ public class VideoStreamSessionManager {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
public enum SessionType {
play,
playback,
@@ -52,7 +54,7 @@ public class VideoStreamSessionManager {
ssrcTransaction.setMediaServerId(mediaServerId);
ssrcTransaction.setType(type);
RedisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
redisTemplate.opsForValue().set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId()
+ "_" + deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction);
}
@@ -71,11 +73,11 @@ public class VideoStreamSessionManager {
stream ="*";
}
String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream;
List<Object> scanResult = RedisUtil.scan(key);
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) {
return null;
}
return (SsrcTransaction)RedisUtil.get((String) scanResult.get(0));
return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0));
}
public List<SsrcTransaction> getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){
@@ -92,13 +94,13 @@ public class VideoStreamSessionManager {
stream ="*";
}
String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream;
List<Object> scanResult = RedisUtil.scan(key);
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() == 0) {
return null;
}
List<SsrcTransaction> result = new ArrayList<>();
for (Object keyObj : scanResult) {
result.add((SsrcTransaction)RedisUtil.get((String) keyObj));
result.add((SsrcTransaction)redisTemplate.opsForValue().get(keyObj));
}
return result;
}
@@ -124,17 +126,17 @@ public class VideoStreamSessionManager {
if (ssrcTransaction == null) {
return;
}
RedisUtil.del(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_"
redisTemplate.delete(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_"
+ deviceId + "_" + channelId + "_" + ssrcTransaction.getCallId() + "_" + ssrcTransaction.getStream());
}
public List<SsrcTransaction> getAllSsrc() {
List<Object> ssrcTransactionKeys = RedisUtil.scan(String.format("%s_*_*_*_*", VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX+ userSetting.getServerId()));
List<Object> ssrcTransactionKeys = RedisUtil.scan(redisTemplate, String.format("%s_*_*_*_*", VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX+ userSetting.getServerId()));
List<SsrcTransaction> result= new ArrayList<>();
for (int i = 0; i < ssrcTransactionKeys.size(); i++) {
String key = (String)ssrcTransactionKeys.get(i);
SsrcTransaction ssrcTransaction = JsonUtil.redisJsonToObject(key, SsrcTransaction.class);
for (Object ssrcTransactionKey : ssrcTransactionKeys) {
String key = (String) ssrcTransactionKey;
SsrcTransaction ssrcTransaction = JsonUtil.redisJsonToObject(redisTemplate, key, SsrcTransaction.class);
result.add(ssrcTransaction);
}
return result;

View File

@@ -20,7 +20,7 @@ public interface ISIPCommanderForPlatform {
void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException;
void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException;
void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException;
void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean isRegister) throws SipException, InvalidArgumentException, ParseException;
/**
* 向上级平台注销

View File

@@ -45,7 +45,7 @@ public class SIPRequestHeaderPlarformProvider {
@Autowired
private IRedisCatchStorage redisCatchStorage;
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String toTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException {
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String toTag, CallIdHeader callIdHeader, int expires) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
String sipAddress = parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort();
//请求行
@@ -78,8 +78,8 @@ public class SIPRequestHeaderPlarformProvider {
.createSipURI(parentPlatform.getDeviceGBId(), sipAddress));
request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress));
ExpiresHeader expires = sipLayer.getSipFactory().createHeaderFactory().createExpiresHeader(isRegister ? parentPlatform.getExpires() : 0);
request.addHeader(expires);
ExpiresHeader expiresHeader = sipLayer.getSipFactory().createHeaderFactory().createExpiresHeader(expires);
request.addHeader(expiresHeader);
request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil));
@@ -87,10 +87,10 @@ public class SIPRequestHeaderPlarformProvider {
}
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String toTag,
WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException {
WWWAuthenticateHeader www , CallIdHeader callIdHeader, int expires) throws ParseException, PeerUnavailableException, InvalidArgumentException {
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, toTag, callIdHeader, isRegister);
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, toTag, callIdHeader, expires);
SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort());
if (www == null) {
AuthorizationHeader authorizationHeader = sipLayer.getSipFactory().createHeaderFactory().createAuthorizationHeader("Digest");

View File

@@ -71,23 +71,23 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException {
register(parentPlatform, null, null, errorEvent, okEvent, false, true);
register(parentPlatform, null, null, errorEvent, okEvent, true);
}
@Override
public void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException {
register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, true);
register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, true);
}
@Override
public void unregister(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException {
register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, false);
register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false);
}
@Override
public void register(ParentPlatform parentPlatform, @Nullable SipTransactionInfo sipTransactionInfo, @Nullable WWWAuthenticateHeader www,
SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException {
SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean isRegister) throws SipException, InvalidArgumentException, ParseException {
Request request;
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
@@ -105,10 +105,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
}
if (!registerAgain ) {
if (www == null ) {
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform,
redisCatchStorage.getCSEQ(), fromTag,
toTag, callIdHeader, isRegister);
toTag, callIdHeader, isRegister? parentPlatform.getExpires() : 0);
// 将 callid 写入缓存, 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId();
redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister));
@@ -126,7 +126,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
});
}else {
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister);
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0);
}
sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent);
@@ -497,10 +497,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent )
throws SipException, ParseException, InvalidArgumentException {
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipLayer.getSipFactory().createMessageFactory();
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipLayer.getSipFactory().createMessageFactory();
String characterSet = parentPlatform.getCharacterSet();
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset(characterSet);
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset(characterSet);
SIPRequest notifyRequest = headerProviderPlatformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, subscribeInfo);

View File

@@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.UJson;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.Element;
import org.slf4j.Logger;
@@ -18,6 +17,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -29,6 +29,7 @@ import javax.sip.message.Response;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@@ -57,6 +58,9 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private Long recordInfoTtl = 1800L;
@Override
@@ -130,10 +134,11 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
.collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
// 获取任务结果数据
String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
RedisUtil.hmset(resKey, map, recordInfoTtl);
redisTemplate.opsForHash().putAll(resKey, map);
redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
long incr = RedisUtil.incr(resCountKey, map.size());
RedisUtil.expire(resCountKey, recordInfoTtl);
long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
recordInfo.setRecordList(recordList);
recordInfo.setCount(Math.toIntExact(incr));
eventPublisher.recordEndEventPush(recordInfo);
@@ -141,7 +146,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
return;
}
// 已接收完成
List<RecordItem> resList = RedisUtil.hmget(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
if (resList.size() < sumNum) {
return;
}

View File

@@ -92,7 +92,7 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
try {
sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, true, platformRegisterInfo.isRegister());
sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, platformRegisterInfo.isRegister());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage());
}