合并主分支

This commit is contained in:
648540858
2022-01-13 16:39:32 +08:00
70 changed files with 2811 additions and 14551 deletions

View File

@@ -58,9 +58,15 @@ public class VideoManagerConstants {
public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
public static final String SIP_SN_PREFIX = "VMP_SIP_SN_";
public static final String SIP_SUBSCRIBE_PREFIX = "SIP_SUBSCRIBE_";
//************************** redis 消息*********************************
public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
public static final String WVP_MSG_GPS_PREFIX = "WVP_MSG_GPS_";
//************************** 第三方 ****************************************
public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
}

View File

@@ -1,12 +1,16 @@
package com.genersoft.iot.vmp.conf;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.service.impl.RedisGPSMsgListener;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -41,6 +45,9 @@ public class RedisConfig extends CachingConfigurerSupport {
@Value("${spring.redis.poolMaxWait:5}")
private int poolMaxWait;
@Autowired
private RedisGPSMsgListener redisGPSMsgListener;
@Bean
public JedisPool jedisPool() {
if (StringUtils.isBlank(password)) {
@@ -85,6 +92,7 @@ public class RedisConfig extends CachingConfigurerSupport {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_GPS_PREFIX));
return container;
}

View File

@@ -0,0 +1,41 @@
package com.genersoft.iot.vmp.conf;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.util.StringUtils;
import java.util.Properties;
public class RedisKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener {
private UserSetup userSetup;
private RedisMessageListenerContainer listenerContainer;
private String keyspaceNotificationsConfigParameter = "EA";
public RedisKeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
super(listenerContainer);
this.listenerContainer = listenerContainer;
this.userSetup = userSetup;
}
@Override
public void init() {
if (!userSetup.getRedisConfig()) {
// 配置springboot默认Config为空即不让应用去修改redis的默认配置因为Redis服务出于安全会禁用CONFIG命令给远程用户使用
setKeyspaceNotificationsConfigParameter("");
}else {
RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();
Properties config = connection.getConfig("notify-keyspace-events");
try {
if (!config.getProperty("notify-keyspace-events").equals(keyspaceNotificationsConfigParameter)) {
connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
}
} finally {
connection.close();
}
}
super.init();
}
}

View File

@@ -0,0 +1,8 @@
package com.genersoft.iot.vmp.gb28181.bean;
public class CmdType {
public static final String CATALOG = "Catalog";
public static final String ALARM = "Alarm";
public static final String MOBILE_POSITION = "MobilePosition";
}

View File

@@ -0,0 +1,21 @@
package com.genersoft.iot.vmp.gb28181.bean;
import javax.sip.Dialog;
import java.util.EventObject;
public class DeviceNotFoundEvent extends EventObject {
/**
* Constructs a prototypical Event.
*
* @param dialog
* @throws IllegalArgumentException if source is null.
*/
public DeviceNotFoundEvent(Dialog dialog) {
super(dialog);
}
public Dialog getDialog() {
return (Dialog)super.getSource();
}
}

View File

@@ -109,6 +109,11 @@ public class ParentPlatform {
*/
private boolean shareAllLiveStream;
/**
* 默认目录Id,自动添加的通道多放在这个目录下
*/
private String catalogId;
public Integer getId() {
return id;
}
@@ -277,4 +282,12 @@ public class ParentPlatform {
public void setShareAllLiveStream(boolean shareAllLiveStream) {
this.shareAllLiveStream = shareAllLiveStream;
}
public String getCatalogId() {
return catalogId;
}
public void setCatalogId(String catalogId) {
this.catalogId = catalogId;
}
}

View File

@@ -0,0 +1,71 @@
package com.genersoft.iot.vmp.gb28181.bean;
public class PlatformCatalog {
private String id;
private String name;
private String platformId;
private String parentId;
private int childrenCount; // 子节点数
private int type; // 0 目录, 1 国标通道, 2 直播流
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPlatformId() {
return platformId;
}
public void setPlatformId(String platformId) {
this.platformId = platformId;
}
public String getParentId() {
return parentId;
}
public void setParentId(String parentId) {
this.parentId = parentId;
}
public int getChildrenCount() {
return childrenCount;
}
public void setChildrenCount(int childrenCount) {
this.childrenCount = childrenCount;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public void setTypeForCatalog() {
this.type = 0;
}
public void setTypeForGb() {
this.type = 1;
}
public void setTypeForStream() {
this.type = 2;
}
}

View File

@@ -4,6 +4,7 @@ public class PlatformGbStream {
private String app;
private String stream;
private String platformId;
private String catalogId;
public String getApp() {
return app;
@@ -29,4 +30,11 @@ public class PlatformGbStream {
this.platformId = platformId;
}
public String getCatalogId() {
return catalogId;
}
public void setCatalogId(String catalogId) {
this.catalogId = catalogId;
}
}

View File

@@ -0,0 +1,78 @@
package com.genersoft.iot.vmp.gb28181.bean;
import javax.sip.RequestEvent;
import javax.sip.header.*;
import javax.sip.message.Request;
public class SubscribeInfo {
public SubscribeInfo() {
}
public SubscribeInfo(RequestEvent evt, String id) {
this.id = id;
Request request = evt.getRequest();
CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
this.callId = callIdHeader.getCallId();
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
this.fromTag = fromHeader.getTag();
ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
this.expires = expiresHeader.getExpires();
this.event = (EventHeader)request.getHeader(EventHeader.NAME);
}
private String id;
private int expires;
private String callId;
private EventHeader event;
private String fromTag;
private String toTag;
public String getId() {
return id;
}
public int getExpires() {
return expires;
}
public String getCallId() {
return callId;
}
public EventHeader getEvent() {
return event;
}
public String getFromTag() {
return fromTag;
}
public void setToTag(String toTag) {
this.toTag = toTag;
}
public String getToTag() {
return toTag;
}
public void setId(String id) {
this.id = id;
}
public void setExpires(int expires) {
this.expires = expires;
}
public void setCallId(String callId) {
this.callId = callId;
}
public void setEvent(EventHeader event) {
this.event = event;
}
public void setFromTag(String fromTag) {
this.fromTag = fromTag;
}
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@@ -91,6 +92,13 @@ public class SipSubscribe {
this.statusCode = -1024;
this.callId = dialogTerminatedEvent.getDialog().getCallId().getCallId();
this.dialog = dialogTerminatedEvent.getDialog();
}else if (event instanceof DeviceNotFoundEvent) {
DeviceNotFoundEvent deviceNotFoundEvent = (DeviceNotFoundEvent)event;
this.type = "deviceNotFoundEvent";
this.msg = "设备未找到";
this.statusCode = -1024;
this.callId = deviceNotFoundEvent.getDialog().getCallId().getCallId();
this.dialog = deviceNotFoundEvent.getDialog();
}
}
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event.offline;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -7,12 +8,16 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import org.springframework.util.StringUtils;
import java.util.Properties;
/**
* @description:设备心跳超时监听,借助redis过期特性进行监听监听到说明设备心跳超时发送离线事件
@@ -20,7 +25,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
* @date: 2020年5月6日 上午11:35:46
*/
@Component
public class KeepaliveTimeoutListenerForPlatform extends KeyExpirationEventMessageListener {
public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEventMessageListener {
private Logger logger = LoggerFactory.getLogger(KeepaliveTimeoutListenerForPlatform.class);
@@ -30,17 +35,8 @@ public class KeepaliveTimeoutListenerForPlatform extends KeyExpirationEventMessa
@Autowired
private UserSetup userSetup;
@Override
public void init() {
if (!userSetup.getRedisConfig()) {
// 配置springboot默认Config为空即不让应用去修改redis的默认配置因为Redis服务出于安全会禁用CONFIG命令给远程用户使用
setKeyspaceNotificationsConfigParameter("");
}
super.init();
}
public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
super(listenerContainer, userSetup);
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event.offline;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -20,7 +21,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
* @date: 2020年5月6日 上午11:35:46
*/
@Component
public class KeepliveTimeoutListener extends KeyExpirationEventMessageListener {
public class KeepliveTimeoutListener extends RedisKeyExpirationEventMessageListener {
private Logger logger = LoggerFactory.getLogger(KeepliveTimeoutListener.class);
@@ -30,6 +31,10 @@ public class KeepliveTimeoutListener extends KeyExpirationEventMessageListener {
@Autowired
private UserSetup userSetup;
public KeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
super(listenerContainer, userSetup);
}
@Override
public void init() {
if (!userSetup.getRedisConfig()) {
@@ -39,9 +44,6 @@ public class KeepliveTimeoutListener extends KeyExpirationEventMessageListener {
super.init();
}
public KeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 监听失效的keykey格式为keeplive_deviceId

View File

@@ -0,0 +1,52 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* 平台订阅到期事件
*/
@Component
public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
@Autowired
private UserSetup userSetup;
@Autowired
private DynamicTask dynamicTask;
public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
super(listenerContainer, userSetup);
}
/**
* 监听失效的key
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取失效的key
String expiredKey = message.toString();
logger.debug(expiredKey);
// 订阅到期
String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
// 取消定时任务
dynamicTask.stopCron(expiredKey);
}
}
}

View File

@@ -0,0 +1,70 @@
package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import java.text.SimpleDateFormat;
import java.util.List;
public class GPSSubscribeTask implements Runnable{
private IRedisCatchStorage redisCatchStorage;
private IVideoManagerStorager storager;
private ISIPCommanderForPlatform sipCommanderForPlatform;
private String platformId;
private String sn;
private String key;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) {
this.redisCatchStorage = redisCatchStorage;
this.storager = storager;
this.platformId = platformId;
this.sn = sn;
this.key = key;
this.sipCommanderForPlatform = sipCommanderForPlatform;
}
@Override
public void run() {
SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
if (subscribe != null) {
System.out.println("发送GPS消息");
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
if (parentPlatform == null || parentPlatform.isStatus()) {
// TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
if (gbStreams.size() > 0) {
for (GbStream gbStream : gbStreams) {
String gbId = gbStream.getGbId();
GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
if (gbStream.isStatus()) {
if (gpsMsgInfo != null) {
// 发送GPS消息
sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
}else {
// 没有在redis找到新的消息就使用数据库的消息
gpsMsgInfo = new GPSMsgInfo();
gpsMsgInfo.setId(gbId);
gpsMsgInfo.setLat(gbStream.getLongitude());
gpsMsgInfo.setLng(gbStream.getLongitude());
// 发送GPS消息
sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
}
}
}
}
}
}
}
}

View File

@@ -94,7 +94,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
logger.debug(responseEvent.getResponse().toString());
int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == 401) { // Success!
// ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
@@ -108,6 +107,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
if (subscribe != null) {
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
subscribe.response(eventResult);
sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
}
}
}
@@ -122,6 +122,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
if (subscribe != null) {
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
subscribe.response(eventResult);
sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
}
}
}

View File

@@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import javax.sip.header.WWWAuthenticateHeader;
@@ -61,4 +63,12 @@ public interface ISIPCommanderForPlatform {
*/
boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag);
/**
* 向上级回复移动位置订阅消息
* @param parentPlatform 平台信息
* @param gpsMsgInfo GPS信息
* @param subscribeInfo 订阅相关的信息
* @return
*/
boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo);
}

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -32,6 +33,9 @@ public class SIPRequestHeaderPlarformProvider {
@Autowired
private SipFactory sipFactory;
@Autowired
private IRedisCatchStorage redisCatchStorage;
public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
@@ -57,7 +61,7 @@ public class SIPRequestHeaderPlarformProvider {
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
@@ -122,7 +126,7 @@ public class SIPRequestHeaderPlarformProvider {
String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
Request registerRequest = createRegisterRequest(parentPlatform, 2L, fromTag, viaTag, callIdHeader);
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader);
String realm = www.getRealm();
String nonce = www.getNonce();
@@ -208,7 +212,7 @@ public class SIPRequestHeaderPlarformProvider {
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE);
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312");
@@ -223,4 +227,43 @@ public class SIPRequestHeaderPlarformProvider {
request.setContent(content, contentTypeHeader);
return request;
}
public Request createNotifyRequest(ParentPlatform parentPlatform, String content, String fromTag, String toTag, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException {
Request request = null;
// sipuri
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
parentPlatform.getTransport(), null);
viaHeader.setRPort();
viaHeaders.add(viaHeader);
// from
SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag);
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312");
request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
request.addHeader(userAgentHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
request.setContent(content, contentTypeHeader);
return request;
}
}

View File

@@ -3,9 +3,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +94,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
callIdHeader = udpSipProvider.getNewCallId();
}
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, "FromRegister" + tm, null, callIdHeader);
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader);
// 将 callid 写入缓存, 等注册成功可以更新状态
redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId());
@@ -222,7 +224,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
catalogXml.append("<Status>" + (channel.getStatus() == 0?"OFF":"ON") + "</Status>\r\n");
catalogXml.append("<Info></Info>\r\n");
catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n");
catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n");
catalogXml.append("<IPAddress>" + channel.getIpAddress() + "</IPAddress>\r\n");
catalogXml.append("<Port>" + channel.getPort() + "</Port>\r\n");
catalogXml.append("<Info>\r\n");
catalogXml.append("<PTZType>" + channel.getPTZType() + "</PTZType>\r\n");
catalogXml.append("</Info>\r\n");
}
@@ -319,4 +327,41 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
return true;
}
@Override
public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) {
if (parentPlatform == null) {
return false;
}
try {
StringBuffer deviceStatusXml = new StringBuffer(600);
deviceStatusXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
deviceStatusXml.append("<Notify>\r\n");
deviceStatusXml.append("<CmdType>MobilePosition</CmdType>\r\n");
deviceStatusXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
deviceStatusXml.append("<DeviceID>" + gpsMsgInfo.getId() + "</DeviceID>\r\n");
deviceStatusXml.append("<Time>" + gpsMsgInfo.getTime() + "</Time>\r\n");
deviceStatusXml.append("<Longitude>" + gpsMsgInfo.getLng() + "</Longitude>\r\n");
deviceStatusXml.append("<Latitude>" + gpsMsgInfo.getLat() + "</Latitude>\r\n");
deviceStatusXml.append("<Speed>" + gpsMsgInfo.getSpeed() + "</Speed>\r\n");
deviceStatusXml.append("<Direction>" + gpsMsgInfo.getDirection() + "</Direction>\r\n");
deviceStatusXml.append("<Altitude>" + gpsMsgInfo.getAltitude() + "</Altitude>\r\n");
deviceStatusXml.append("</Notify>\r\n");
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
callIdHeader.setCallId(subscribeInfo.getCallId());
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, deviceStatusXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader);
transmitRequest(parentPlatform, request);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return false;
}
return true;
}
}

View File

@@ -18,6 +18,7 @@ import javax.sip.address.Address;
import javax.sip.address.AddressFactory;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.HeaderFactory;
import javax.sip.header.ViaHeader;
import javax.sip.message.MessageFactory;
@@ -153,7 +154,7 @@ public abstract class SIPRequestProcessorParent {
* @throws InvalidArgumentException
* @throws ParseException
*/
public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
public void responseSdpAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
SipFactory sipFactory = SipFactory.getInstance();
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
@@ -168,6 +169,31 @@ public abstract class SIPRequestProcessorParent {
getServerTransaction(evt).sendResponse(response);
}
/**
* 回复带xml的200
* @param evt
* @param xml
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
public Response responseXmlAck(RequestEvent evt, String xml) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
SipFactory sipFactory = SipFactory.getInstance();
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
response.setContent(xml, contentTypeHeader);
SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
Address concatAddress = sipFactory.createAddressFactory().createAddress(
sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort()
));
response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
response.addHeader(evt.getRequest().getHeader(ExpiresHeader.NAME));
getServerTransaction(evt).sendResponse(response);
return response;
}
public Element getRootElement(RequestEvent evt) throws DocumentException {
return getRootElement(evt, "gb2312");
}

View File

@@ -107,6 +107,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
List<GbStream> gbStreams = storager.queryStreamInParentPlatform(requesterId, channelId);
PlatformCatalog catalog = storager.getCatalog(channelId);
GbStream gbStream = gbStreams.size() > 0? gbStreams.get(0):null;
MediaServerItem mediaServerItem = null;
// 不是通道可能是直播流
@@ -132,7 +133,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return;
}
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
}else {
}else if (catalog != null) {
responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播
return;
} else {
logger.info("通道不存在返回404");
responseAck(evt, Response.NOT_FOUND); // 通道不存在发404资源不存在
return;
@@ -249,7 +253,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
content.append("f=\r\n");
try {
responseAck(evt, content.toString());
responseSdpAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
@@ -306,7 +310,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
content.append("f=\r\n");
try {
responseAck(evt, content.toString());
responseSdpAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {

View File

@@ -62,9 +62,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private DeviceOffLineDetector offLineDetector;
private static final String NOTIFY_CATALOG = "Catalog";
private static final String NOTIFY_ALARM = "Alarm";
private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
private String method = "NOTIFY";
@Autowired
@@ -82,13 +80,13 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
Element rootElement = getRootElement(evt);
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (NOTIFY_CATALOG.equals(cmd)) {
if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知");
processNotifyCatalogList(evt);
} else if (NOTIFY_ALARM.equals(cmd)) {
} else if (CmdType.ALARM.equals(cmd)) {
logger.info("接收到Alarm通知");
processNotifyAlarm(evt);
} else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(evt);
} else {

View File

@@ -1,8 +1,21 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -13,7 +26,10 @@ import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.Header;
import javax.sip.header.ToHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
@@ -30,6 +46,21 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private ISIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private DynamicTask dynamicTask;
@Autowired
private UserSetup userSetup;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -46,30 +77,107 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
Request request = evt.getRequest();
try {
Response response = null;
response = getMessageFactory().createResponse(200, request);
if (response != null) {
ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
response.setExpires(expireHeader);
}
logger.info("response : " + response.toString());
ServerTransaction transaction = getServerTransaction(evt);
if (transaction != null) {
transaction.sendResponse(response);
transaction.getDialog().delete();
transaction.terminate();
Element rootElement = getRootElement(evt);
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition订阅");
processNotifyMobilePosition(evt, rootElement);
// } else if (CmdType.ALARM.equals(cmd)) {
// logger.info("接收到Alarm订阅");
// processNotifyAlarm(evt, rootElement);
// } else if (CmdType.CATALOG.equals(cmd)) {
// logger.info("接收到Catalog订阅");
// processNotifyCatalogList(evt, rootElement);
} else {
logger.info("processRequest serverTransactionId is null.");
logger.info("接收到消息:" + cmd);
// responseAck(evt, Response.OK);
Response response = null;
response = getMessageFactory().createResponse(200, request);
if (response != null) {
ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
response.setExpires(expireHeader);
}
logger.info("response : " + response.toString());
ServerTransaction transaction = getServerTransaction(evt);
if (transaction != null) {
transaction.sendResponse(response);
transaction.getDialog().delete();
transaction.terminate();
} else {
logger.info("processRequest serverTransactionId is null.");
}
}
} catch (ParseException e) {
e.printStackTrace();
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (DocumentException e) {
e.printStackTrace();
}
}
/**
* 处理移动位置订阅消息
*/
private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) {
String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
String deviceID = XmlUtil.getText(rootElement, "DeviceID");
SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + platformId;
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")
.append("<CmdType>MobilePosition</CmdType>\r\n")
.append("<SN>" + sn + "</SN>\r\n")
.append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
.append("<Result>OK</Result>\r\n")
.append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) {
if (redisCatchStorage.getSubscribe(key) != null) {
dynamicTask.stopCron(key);
}
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval));
redisCatchStorage.updateSubscribe(key, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) {
dynamicTask.stopCron(key);
redisCatchStorage.delSubscribe(key);
}
try {
Response response = responseXmlAck(evt, resultXml.toString());
ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
subscribeInfo.setToTag(toHeader.getTag());
redisCatchStorage.updateSubscribe(key, subscribeInfo);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
private void processNotifyAlarm(RequestEvent evt, Element rootElement) {
}
private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
}
}

View File

@@ -1,7 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -19,6 +21,7 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Map;
@@ -39,6 +42,9 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
@Autowired
private IVideoManagerStorager storage;
@Autowired
private SipSubscribe sipSubscribe;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -56,6 +62,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
public void process(RequestEvent evt) {
logger.debug("接收到消息:" + evt.getRequest());
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
// 查询设备是否存在
Device device = redisCatchStorage.getDevice(deviceId);
// 查询上级平台是否存在
@@ -63,7 +70,12 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
try {
if (device == null && parentPlatform == null) {
// 不存在则回复404
responseAck(evt, Response.NOT_FOUND, "device id not found");
responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found");
logger.warn("[设备未找到 ] {}", deviceId);
if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new DeviceNotFoundEvent(evt.getDialog()));
sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).response(eventResult);
};
}else {
Element rootElement = getRootElement(evt);
String name = rootElement.getName();

View File

@@ -1,10 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@@ -71,11 +68,41 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
// 查询关联的直播通道
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
int size = channelReduces.size() + gbStreams.size();
// 回复目录信息
List<PlatformCatalog> catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
if (catalogs.size() > 0) {
for (PlatformCatalog catalog : catalogs) {
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setChannelId(catalog.getId());
deviceChannel.setName(catalog.getName());
deviceChannel.setLongitude(0.0);
deviceChannel.setLatitude(0.0);
deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
deviceChannel.setStatus(1);
deviceChannel.setParental(1);
deviceChannel.setParentId(catalog.getParentId());
deviceChannel.setRegisterWay(1);
deviceChannel.setCivilCode(config.getDomain());
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(10);
}
}
// 回复级联的通道
if (channelReduces.size() > 0) {
for (ChannelReduce channelReduce : channelReduces) {
DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
// TODO 目前暂时认为这里只用通道没有目录
deviceChannel.setParental(0);
deviceChannel.setParentId(channelReduce.getCatalogId());
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(10);
}
}
// 回复直播的通道
@@ -89,16 +116,16 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
deviceChannel.setStatus(gbStream.isStatus()?1:0);
// deviceChannel.setParentId(parentPlatform.getDeviceGBId());
deviceChannel.setParentId(gbStream.getCatalogId());
deviceChannel.setRegisterWay(1);
deviceChannel.setCivilCode(config.getDomain());
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
deviceChannel.setParental(0);
deviceChannel.setSecrecy("0");
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(10);
}
}
if (size == 0) {
@@ -111,6 +138,8 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

View File

@@ -1,10 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@@ -73,12 +70,41 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
List<ChannelReduce> channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getServerGBId());
// 查询关联的直播通道
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
int size = channelReduces.size() + gbStreams.size();
// 回复目录信息
List<PlatformCatalog> catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
int size = catalogs.size() + channelReduces.size() + gbStreams.size();
if (catalogs.size() > 0) {
for (PlatformCatalog catalog : catalogs) {
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setChannelId(catalog.getId());
deviceChannel.setName(catalog.getName());
deviceChannel.setLongitude(0.0);
deviceChannel.setLatitude(0.0);
deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
deviceChannel.setStatus(1);
deviceChannel.setParental(1);
deviceChannel.setParentId(catalog.getParentId());
deviceChannel.setRegisterWay(1);
deviceChannel.setCivilCode(config.getDomain());
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(10);
}
}
// 回复级联的通道
if (channelReduces.size() > 0) {
for (ChannelReduce channelReduce : channelReduces) {
DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
// TODO 目前暂时认为这里只用通道没有目录
deviceChannel.setParental(0);
deviceChannel.setParentId(channelReduce.getCatalogId());
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(10);
}
}
// 回复直播的通道
@@ -92,14 +118,13 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
deviceChannel.setStatus(gbStream.isStatus()?1:0);
// deviceChannel.setParentId(parentPlatform.getDeviceGBId());
deviceChannel.setParentId(gbStream.getCatalogId());
deviceChannel.setRegisterWay(1);
deviceChannel.setCivilCode(config.getDomain());
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
deviceChannel.setParental(0);
deviceChannel.setSecrecy("0");
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
}
@@ -114,6 +139,8 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

View File

@@ -91,7 +91,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
// 遍历DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
continue;

View File

@@ -4,7 +4,6 @@ import java.util.List;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetup;
@@ -302,7 +301,7 @@ public class ZLMHttpHookListener {
@ResponseBody
@PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
public ResponseEntity<String> onStreamChanged(@RequestBody MediaItem item){
if (logger.isDebugEnabled()) {
logger.debug("[ ZLM HOOK ]on_stream_changed API调用参数" + JSONObject.toJSONString(item));
}
@@ -322,10 +321,8 @@ public class ZLMHttpHookListener {
String schema = item.getSchema();
List<MediaItem.MediaTrack> tracks = item.getTracks();
boolean regist = item.isRegist();
if (tracks != null) {
logger.info("[stream: " + streamId + "] on_stream_changed->>" + schema);
}
if ("rtmp".equals(schema)){
logger.info("on_stream_changed注册->{}, app->{}, stream->{}", regist, app, streamId);
if (regist) {
mediaServerService.addCount(mediaServerId);
}else {
@@ -346,24 +343,16 @@ public class ZLMHttpHookListener {
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null){
if (regist) {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item);
if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
zlmMediaListManager.addPush(item);
}
}else {
// 兼容流注销时类型错误的问题等zlm更新后删除
StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
if (streamPushItem != null) {
type = "PUSH";
}else {
StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
if (streamProxyByAppAndStream != null) {
type = "PULL";
}
}
// 兼容流注销时类型从redis记录获取
MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId);
type = OriginType.values()[mediaItem.getOriginType()].getType();
zlmMediaListManager.removeMedia(app, streamId);
redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
}

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.zlm.event;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -21,7 +22,7 @@ import org.springframework.stereotype.Component;
* @date: 2020年5月6日 上午11:35:46
*/
@Component
public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListener {
public class ZLMKeepliveTimeoutListener extends RedisKeyExpirationEventMessageListener {
private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class);
@@ -37,20 +38,12 @@ public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListene
@Autowired
private IMediaServerService mediaServerService;
@Override
public void init() {
if (!userSetup.getRedisConfig()) {
// 配置springboot默认Config为空即不让应用去修改redis的默认配置因为Redis服务出于安全会禁用CONFIG命令给远程用户使用
setKeyspaceNotificationsConfigParameter("");
}
super.init();
public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
super(listenerContainer, userSetup);
}
public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
/**
* 监听失效的keykey格式为keeplive_deviceId
* @param message
* @param pattern

View File

@@ -30,7 +30,7 @@ public interface IGbStreamService {
* 保存国标关联
* @param gbStreams
*/
boolean addPlatformInfo(List<GbStream> gbStreams, String platformId);
boolean addPlatformInfo(List<GbStream> gbStreams, String platformId, String catalogId);
/**
* 移除国标关联

View File

@@ -0,0 +1,39 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 定时查找redis中的GPS推送消息并保存到对应的流中
*/
@Component
public class StreamGPSSubscribeTask {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorager storager;
@Scheduled(fixedRate = 30 * 1000) //每30秒执行一次
public void execute(){
List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();
if (gpsMsgInfo.size() > 0) {
storager.updateStreamGPS(gpsMsgInfo);
for (GPSMsgInfo msgInfo : gpsMsgInfo) {
msgInfo.setStored(true);
redisCatchStorage.updateGpsMsgInfo(msgInfo);
}
}
}
}

View File

@@ -0,0 +1,106 @@
package com.genersoft.iot.vmp.service.bean;
public class GPSMsgInfo {
/**
*
*/
private String id;
/**
* 经度 (必选)
*/
private double lng;
/**
* 纬度 (必选)
*/
private double lat;
/**
* 速度,单位:km/h (可选)
*/
private double speed;
/**
* 产生通知时间, 时间格式: 2020-01-14T14:32:12
*/
private String time;
/**
* 方向,取值为当前摄像头方向与正北方的顺时针夹角,取值范围0°~360°,单位:(°)(可选)
*/
private String direction;
/**
* 海拔高度,单位:m(可选)
*/
private String altitude;
private boolean stored;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public double getLng() {
return lng;
}
public void setLng(double lng) {
this.lng = lng;
}
public double getLat() {
return lat;
}
public void setLat(double lat) {
this.lat = lat;
}
public double getSpeed() {
return speed;
}
public void setSpeed(double speed) {
this.speed = speed;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getDirection() {
return direction;
}
public void setDirection(String direction) {
this.direction = direction;
}
public String getAltitude() {
return altitude;
}
public void setAltitude(String altitude) {
this.altitude = altitude;
}
public boolean isStored() {
return stored;
}
public void setStored(boolean stored) {
this.stored = stored;
}
}

View File

@@ -47,13 +47,15 @@ public class GbStreamServiceImpl implements IGbStreamService {
@Override
public boolean addPlatformInfo(List<GbStream> gbStreams, String platformId) {
public boolean addPlatformInfo(List<GbStream> gbStreams, String platformId, String catalogId) {
// 放在事务内执行
boolean result = false;
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
for (GbStream gbStream : gbStreams) {
gbStream.setCatalogId(catalogId);
gbStream.setPlatformId(platformId);
// TODO 修改为批量提交
platformGbStreamMapper.add(gbStream);
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交

View File

@@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class RedisGPSMsgListener implements MessageListener {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Override
public void onMessage(Message message, byte[] bytes) {
GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
}
}

View File

@@ -130,7 +130,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if ( !StringUtils.isEmpty(param.getPlatformGbId()) && streamLive) {
List<GbStream> gbStreams = new ArrayList<>();
gbStreams.add(param);
if (gbStreamService.addPlatformInfo(gbStreams, param.getPlatformGbId())){
if (gbStreamService.addPlatformInfo(gbStreams, param.getPlatformGbId(), param.getCatalogId())){
result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]成功");
}else {
result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]失败");
@@ -141,6 +141,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (parentPlatforms.size() > 0) {
for (ParentPlatform parentPlatform : parentPlatforms) {
param.setPlatformId(parentPlatform.getServerGBId());
param.setCatalogId(parentPlatform.getCatalogId());
String stream = param.getStream();
StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId());
if (streamProxyItems == null) {
@@ -278,18 +279,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
String type = "PULL";
// 发送redis消息
List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (streamInfoList.size() > 0) {
for (StreamInfo streamInfo : streamInfoList) {
List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, type);
if (mediaItems.size() > 0) {
for (MediaItem mediaItem : mediaItems) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetup.getServerId());
jsonObject.put("app", streamInfo.getApp());
jsonObject.put("stream", streamInfo.getStreamId());
jsonObject.put("app", mediaItem.getApp());
jsonObject.put("stream", mediaItem.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
}
}
}

View File

@@ -119,6 +119,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
if (parentPlatforms.size() > 0) {
for (ParentPlatform parentPlatform : parentPlatforms) {
stream.setCatalogId(parentPlatform.getCatalogId());
stream.setPlatformId(parentPlatform.getServerGBId());
String streamId = stream.getStream();
StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
@@ -172,16 +173,16 @@ public class StreamPushServiceImpl implements IStreamPushService {
List<StreamPushItem> pushList = getPushList(mediaServerId);
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
// redis记录
List<StreamInfo> streamInfoPushList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, StreamInfo> streamInfoPushItemMap = new HashMap<>();
List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
for (StreamPushItem streamPushItem : pushList) {
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
}
}
if (streamInfoPushList.size() > 0) {
for (StreamInfo streamInfo : streamInfoPushList) {
streamInfoPushItemMap.put(streamInfo.getApp() + streamInfo.getStreamId(), streamInfo);
if (mediaItems.size() > 0) {
for (MediaItem mediaItem : mediaItems) {
streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
}
}
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
@@ -220,19 +221,19 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
Collection<StreamInfo> offlineStreamInfoItems = streamInfoPushItemMap.values();
if (offlineStreamInfoItems.size() > 0) {
Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
if (offlineMediaItemList.size() > 0) {
String type = "PUSH";
for (StreamInfo offlineStreamInfoItem : offlineStreamInfoItems) {
for (MediaItem offlineMediaItem : offlineMediaItemList) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetup.getServerId());
jsonObject.put("app", offlineStreamInfoItem.getApp());
jsonObject.put("stream", offlineStreamInfoItem.getStreamId());
jsonObject.put("app", offlineMediaItem.getApp());
jsonObject.put("stream", offlineMediaItem.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineStreamInfoItem.getApp(), offlineStreamInfoItem.getStreamId());
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
}
}
}));
@@ -249,15 +250,15 @@ public class StreamPushServiceImpl implements IStreamPushService {
// 发送流停止消息
String type = "PUSH";
// 发送redis消息
List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (streamInfoList.size() > 0) {
for (StreamInfo streamInfo : streamInfoList) {
for (MediaItem mediaItem : streamInfoList) {
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetup.getServerId());
jsonObject.put("app", streamInfo.getApp());
jsonObject.put("stream", streamInfo.getStreamId());
jsonObject.put("app", mediaItem.getApp());
jsonObject.put("stream", mediaItem.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);

View File

@@ -2,11 +2,11 @@ package com.genersoft.iot.vmp.storager;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import java.util.List;
@@ -145,7 +145,7 @@ public interface IRedisCatchStorage {
* @param app
* @param streamId
*/
void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, StreamInfo streamInfo);
void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, MediaItem item);
/**
* 移除流信息从redis
@@ -177,7 +177,7 @@ public interface IRedisCatchStorage {
*/
ThirdPartyGB queryMemberNoGBId(String queryKey);
List<StreamInfo> getStreams(String mediaServerId, String pull);
List<MediaItem> getStreams(String mediaServerId, String pull);
/**
* 将device信息写入redis
@@ -185,10 +185,29 @@ public interface IRedisCatchStorage {
*/
void updateDevice(Device device);
void removeDevice(String deviceId);
/**
* 获取Device
*/
Device getDevice(String deviceId);
void resetAllCSEQ();
void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);
GPSMsgInfo getGpsMsgInfo(String gbId);
List<GPSMsgInfo> getAllGpsMsgInfo();
Long getSN(String method);
void resetAllSN();
void updateSubscribe(String key, SubscribeInfo subscribeInfo);
SubscribeInfo getSubscribe(String key);
void delSubscribe(String key);
MediaItem getStreamInfo(String app, String streamId, String mediaServerId);
}

View File

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.github.pagehelper.PageInfo;
@@ -243,7 +244,7 @@ public interface IVideoManagerStorager {
* @param channelReduces
* @return
*/
int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces);
int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId);
/**
* 移除上级平台的通道信息
@@ -256,6 +257,9 @@ public interface IVideoManagerStorager {
DeviceChannel queryChannelInParentPlatform(String platformId, String channelId);
List<PlatformCatalog> queryChannelInParentPlatformAndCatalog(String platformId, String catalogId);
List<PlatformCatalog> queryStreamInParentPlatformAndCatalog(String platformId, String catalogId);
Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId);
@@ -431,4 +435,28 @@ public interface IVideoManagerStorager {
* @param deviceChannelList
*/
boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList);
/**
* 获取目录信息
* @param platformId
* @param parentId
* @return
*/
List<PlatformCatalog> getChildrenCatalogByPlatform(String platformId, String parentId);
int addCatalog(PlatformCatalog platformCatalog);
PlatformCatalog getCatalog(String id);
int delCatalog(String id);
int updateCatalog(PlatformCatalog platformCatalog);
int setDefaultCatalog(String platformId, String catalogId);
List<PlatformCatalog> queryCatalogInPlatform(String serverGBId);
int delRelation(PlatformCatalog platformCatalog);
int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfo);
}

View File

@@ -91,7 +91,8 @@ public interface DeviceChannelMapper {
"SELECT * FROM ( "+
" SELECT dc.channelId, dc.deviceId, dc.name, de.manufacturer, de.hostAddress, " +
"(SELECT count(0) FROM device_channel WHERE parentId=dc.channelId) as subCount, " +
"(SELECT pc.platformId FROM platform_gb_channel pc WHERE pc.deviceId=dc.deviceId AND pc.channelId = dc.channelId ) as platformId " +
"(SELECT pc.platformId FROM platform_gb_channel pc WHERE pc.deviceId=dc.deviceId AND pc.channelId = dc.channelId ) as platformId, " +
"(SELECT pc.catalogId FROM platform_gb_channel pc WHERE pc.deviceId=dc.deviceId AND pc.channelId = dc.channelId ) as catalogId " +
"FROM device_channel dc " +
"LEFT JOIN device de ON dc.deviceId = de.deviceId " +
" WHERE 1=1 " +

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
@@ -35,7 +36,7 @@ public interface GbStreamMapper {
@Delete("DELETE FROM gb_stream WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream")
@Select("SELECT gs.*, pgs.platformId AS platformId, pgs.catalogId AS catalogId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream")
List<GbStream> selectAll();
@Select("SELECT * FROM gb_stream WHERE app=#{app} AND stream=#{stream}")
@@ -44,32 +45,31 @@ public interface GbStreamMapper {
@Select("SELECT * FROM gb_stream WHERE gbId=#{gbId}")
List<GbStream> selectByGBId(String gbId);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " +
@Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
List<GbStream> queryStreamInPlatform(String platformId, String gbId);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " +
@Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " +
"LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
"WHERE pgs.platformId = '${platformId}'")
List<GbStream> queryGbStreamListInPlatform(String platformId);
@Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
"ON gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL")
List<GbStream> queryStreamNotInPlatform();
@Update("UPDATE gb_stream " +
"SET status=${status} " +
"WHERE app=#{app} AND stream=#{stream}")
int setStatus(String app, String stream, boolean status);
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ")
List<GbStream> selectAllByMediaServerId(String mediaServerId);
@Update("UPDATE gb_stream " +
"SET status=${status} " +
"WHERE mediaServerId=#{mediaServerId} ")
void updateStatusByMediaServerId(String mediaServerId, boolean status);
@Select("SELECT * FROM gb_stream WHERE mediaServerId=#{mediaServerId}")
void delByMediaServerId(String mediaServerId);
@Delete("DELETE FROM gb_stream WHERE streamType=#{type} AND gbId=NULL AND mediaServerId=#{mediaServerId}")
void deleteWithoutGBId(String type, String mediaServerId);
@@ -93,4 +93,15 @@ public interface GbStreamMapper {
"</foreach> " +
"</script>")
void batchAdd(List<StreamPushItem> subList);
@Update({"<script>" +
"<foreach collection='gpsMsgInfos' item='item' separator=';'>" +
" UPDATE" +
" gb_stream" +
" SET longitude=${item.lng}, latitude=${item.lat} " +
"WHERE gbId=#{item.id}"+
"</foreach>" +
"</script>"})
int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos);
}

View File

@@ -15,10 +15,10 @@ public interface ParentPlatformMapper {
@Insert("INSERT INTO parent_platform (enable, name, serverGBId, serverGBDomain, serverIP, serverPort, deviceGBId, deviceIp, " +
" devicePort, username, password, expires, keepTimeout, transport, characterSet, ptz, rtcp, " +
" status, shareAllLiveStream) " +
" status, shareAllLiveStream, catalogId) " +
" VALUES (${enable}, '${name}', '${serverGBId}', '${serverGBDomain}', '${serverIP}', ${serverPort}, '${deviceGBId}', '${deviceIp}', " +
" '${devicePort}', '${username}', '${password}', '${expires}', '${keepTimeout}', '${transport}', '${characterSet}', ${ptz}, ${rtcp}, " +
" ${status}, ${shareAllLiveStream})")
" ${status}, ${shareAllLiveStream}, #{catalogId})")
int addParentPlatform(ParentPlatform parentPlatform);
@Update("UPDATE parent_platform " +
@@ -40,7 +40,8 @@ public interface ParentPlatformMapper {
"ptz=#{ptz}, " +
"rtcp=#{rtcp}, " +
"status=#{status}, " +
"shareAllLiveStream=#{shareAllLiveStream} " +
"shareAllLiveStream=#{shareAllLiveStream}, " +
"catalogId=#{catalogId} " +
"WHERE id=#{id}")
int updateParentPlatform(ParentPlatform parentPlatform);
@@ -74,4 +75,11 @@ public interface ParentPlatformMapper {
@Select("SELECT * FROM parent_platform WHERE shareAllLiveStream=true")
List<ParentPlatform> selectAllAhareAllLiveStream();
@Update(value = {" <script>" +
"UPDATE parent_platform " +
"SET catalogId=#{catalogId}" +
"WHERE serverGBId=#{platformId}"+
"</script>"})
int setDefaultCatalog(String platformId, String catalogId);
}

View File

@@ -0,0 +1,42 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface PlatformCatalogMapper {
@Insert("INSERT INTO platform_catalog (id, name, platformId, parentId) VALUES" +
"(#{id}, #{name}, #{platformId}, #{parentId})")
int add(PlatformCatalog platformCatalog);
@Delete("DELETE FROM platform_catalog WHERE id=#{id}")
int del(String id);
@Delete("DELETE FROM platform_catalog WHERE platformId=#{platformId}")
int delByPlatformId(String platformId);
@Select("SELECT *, (SELECT COUNT(1) from platform_catalog where parentId = pc.id AND platformId=#{platformId}) as childrenCount FROM platform_catalog pc WHERE parentId=#{parentId} AND platformId=#{platformId}")
List<PlatformCatalog> selectByParentId(String platformId, String parentId);
@Select("SELECT *, (SELECT COUNT(1) from platform_catalog where parentId = pc.id) as childrenCount FROM platform_catalog pc WHERE pc.id=#{id}")
PlatformCatalog select(String id);
@Update(value = {" <script>" +
"UPDATE platform_catalog " +
"SET name=#{name}" +
"WHERE id=#{id}"+
"</script>"})
int update(PlatformCatalog platformCatalog);
@Select("SELECT *, (SELECT COUNT(1) from platform_catalog where parentId = pc.id) as childrenCount FROM platform_catalog pc WHERE pc.platformId=#{platformId}")
List<PlatformCatalog> selectByPlatForm(String platformId);
}

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
@@ -25,9 +26,9 @@ public interface PlatformChannelMapper {
List<String> findChannelRelatedPlatform(String platformId, List<String> deviceAndChannelIds);
@Insert("<script> "+
"INSERT INTO platform_gb_channel (channelId, deviceId, platformId, deviceAndChannelId) VALUES" +
"INSERT INTO platform_gb_channel (channelId, deviceId, platformId, deviceAndChannelId, catalogId) VALUES" +
"<foreach collection='channelReducesToAdd' item='item' separator=','>" +
" ('${item.channelId}','${item.deviceId}', '${platformId}', '${item.deviceId}_${item.channelId}' )" +
" ('${item.channelId}','${item.deviceId}', '${platformId}', '${item.deviceId}_${item.channelId}' , '${item.catalogId}' )" +
"</foreach>" +
"</script>")
int addChannels(String platformId, List<ChannelReduce> channelReducesToAdd);
@@ -54,6 +55,22 @@ public interface PlatformChannelMapper {
"platformId='${platformId}' AND channelId='${channelId}' ) AND channelId='${channelId}'")
DeviceChannel queryChannelInParentPlatform(String platformId, String channelId);
@Select("select dc.channelId as id, dc.name as name, pgc.platformId as platformId, pgc.catalogId as parentId, 0 as childrenCount, 1 as type " +
"from device_channel dc left join platform_gb_channel pgc on dc.deviceId = pgc.deviceId and dc.channelId = pgc.channelId " +
"where pgc.platformId=#{platformId} and pgc.catalogId=#{catalogId}")
List<PlatformCatalog> queryChannelInParentPlatformAndCatalog(String platformId, String catalogId);
@Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE platformId='${platformId}' AND channelId='${channelId}')")
Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId);
@Delete("<script> "+
"DELETE FROM platform_gb_channel WHERE catalogId=#{id}" +
"</script>")
int delByCatalogId(String id);
@Delete("<script> "+
"DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" +
"</script>")
int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog);
}

View File

@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import org.apache.ibatis.annotations.*;
@@ -12,8 +14,8 @@ import java.util.List;
@Repository
public interface PlatformGbStreamMapper {
@Insert("INSERT INTO platform_gb_stream (app, stream, platformId) VALUES" +
"('${app}', '${stream}', '${platformId}')")
@Insert("INSERT INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" +
"('${app}', '${stream}', '${platformId}', '${catalogId}')")
int add(PlatformGbStream platformGbStream);
@Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}")
@@ -27,4 +29,22 @@ public interface PlatformGbStreamMapper {
@Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{serverGBId}")
StreamProxyItem selectOne(String app, String stream, String serverGBId);
@Select("select gs.* \n" +
"from gb_stream gs\n" +
" left join platform_gb_stream pgs\n" +
" on gs.app = pgs.app and gs.stream = pgs.stream\n" +
"where pgs.platformId=#{platformId} and pgs.catalogId=#{catalogId}")
List<GbStream> queryChannelInParentPlatformAndCatalog(String platformId, String catalogId);
@Select("select gs.gbId as id, gs.name as name, pgs.platformId as platformId, pgs.catalogId as catalogId , 0 as childrenCount, 2 as type\n" +
"from gb_stream gs\n" +
" left join platform_gb_stream pgs\n" +
" on gs.app = pgs.app and gs.stream = pgs.stream\n" +
"where pgs.platformId=#{platformId} and pgs.catalogId=#{catalogId}")
List<PlatformCatalog> queryChannelInParentPlatformAndCatalogForCatlog(String platformId, String catalogId);
@Delete("DELETE FROM platform_gb_stream WHERE catalogId=#{id}")
int delByCatalogId(String id);
}

View File

@@ -5,7 +5,10 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
@@ -48,6 +51,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return result;
}
@Override
public Long getSN(String method) {
String key = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_" + method;
long result = redis.incr(key, 1L);
if (result > Integer.MAX_VALUE) {
redis.set(key, 1);
result = 1;
}
return result;
}
@Override
public void resetAllCSEQ() {
String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_*";
@@ -58,6 +73,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
}
@Override
public void resetAllSN() {
String scanKey = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_*";
List<Object> keys = redis.scan(scanKey);
for (int i = 0; i < keys.size(); i++) {
String key = (String) keys.get(i);
redis.set(key, 1);
}
}
/**
* 开始播放时将流存入redis
*
@@ -318,6 +343,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
redis.del(key.toString());
}
}
List<Object> deviceCache = redis.scan(String.format("%S%s_%s", VideoManagerConstants.DEVICE_PREFIX,
userSetup.getServerId(),
deviceId));
if (deviceCache.size() > 0) {
for (Object key : deviceCache) {
redis.del(key.toString());
}
}
}
@Override
@@ -354,9 +388,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, StreamInfo streamInfo) {
public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, MediaItem mediaItem) {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
redis.set(key, streamInfo);
redis.set(key, mediaItem);
}
@Override
@@ -389,13 +423,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public List<StreamInfo> getStreams(String mediaServerId, String type) {
List<StreamInfo> result = new ArrayList<>();
public List<MediaItem> getStreams(String mediaServerId, String type) {
List<MediaItem> result = new ArrayList<>();
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId;
List<Object> streams = redis.scan(key);
for (Object stream : streams) {
StreamInfo streamInfo = (StreamInfo)redis.get((String) stream);
result.add(streamInfo);
MediaItem mediaItem = (MediaItem)redis.get((String) stream);
result.add(mediaItem);
}
return result;
}
@@ -406,9 +440,72 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
redis.set(key, device);
}
@Override
public void removeDevice(String deviceId) {
String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + deviceId;
redis.del(key);
}
@Override
public Device getDevice(String deviceId) {
String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + deviceId;
return (Device)redis.get(key);
}
@Override
public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId();
redis.set(key, gpsMsgInfo, 60); // 默认GPS消息保存1分钟
}
@Override
public GPSMsgInfo getGpsMsgInfo(String gbId) {
String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gbId;
return (GPSMsgInfo)redis.get(key);
}
@Override
public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
redis.set(key, subscribeInfo, subscribeInfo.getExpires());
}
@Override
public SubscribeInfo getSubscribe(String key) {
return (SubscribeInfo)redis.get(key);
}
@Override
public void delSubscribe(String key) {
redis.del(key);
}
@Override
public List<GPSMsgInfo> getAllGpsMsgInfo() {
String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_*";
List<GPSMsgInfo> result = new ArrayList<>();
List<Object> keys = redis.scan(scanKey);
for (int i = 0; i < keys.size(); i++) {
String key = (String) keys.get(i);
GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key);
if (!gpsMsgInfo.isStored()) { // 只取没有存过得
result.add((GPSMsgInfo)redis.get(key));
}
}
return result;
}
@Override
public MediaItem getStreamInfo(String app, String streamId, String mediaServerId) {
String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId;
MediaItem result = null;
List<Object> keys = redis.scan(scanKey);
if (keys.size() > 0) {
String key = (String) keys.get(0);
result = (MediaItem)redis.get(key);
}
return result;
}
}

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.*;
@@ -68,6 +69,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlatformCatalogMapper catalogMapper;
;
@Autowired
@@ -466,6 +470,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Override
public boolean addParentPlatform(ParentPlatform parentPlatform) {
if (parentPlatform.getCatalogId() == null) {
parentPlatform.setCatalogId(parentPlatform.getServerGBId());
}
int result = platformMapper.addParentPlatform(parentPlatform);
return result > 0;
}
@@ -475,6 +482,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
int result = 0;
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); // .getDeviceGBId());
if (parentPlatform.getId() == null ) {
if (parentPlatform.getCatalogId() == null) {
parentPlatform.setCatalogId(parentPlatform.getServerGBId());
}
result = platformMapper.addParentPlatform(parentPlatform);
if (parentPlatformCatch == null) {
parentPlatformCatch = new ParentPlatformCatch();
@@ -494,15 +504,21 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
// 更新缓存
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
// 共享所有视频流,需要将现有视频流添加到此平台
List<GbStream> gbStreams = gbStreamMapper.selectAll();
if (gbStreams.size() > 0) {
if (parentPlatform.isShareAllLiveStream()) {
gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId());
}else {
gbStreamService.delPlatformInfo(gbStreams);
if (parentPlatform.isEnable()) {
// 共享所有视频流,需要将现有视频流添加到此平台
List<GbStream> gbStreams = gbStreamMapper.queryStreamNotInPlatform();
if (gbStreams.size() > 0) {
for (GbStream gbStream : gbStreams) {
gbStream.setCatalogId(parentPlatform.getCatalogId());
}
if (parentPlatform.isShareAllLiveStream()) {
gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
}else {
gbStreamService.delPlatformInfo(gbStreams);
}
}
}
return result > 0;
}
@@ -553,10 +569,11 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
}
@Override
public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces) {
public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
Map<String, ChannelReduce> deviceAndChannels = new HashMap<>();
for (ChannelReduce channelReduce : channelReduces) {
channelReduce.setCatalogId(catalogId);
deviceAndChannels.put(channelReduce.getDeviceId() + "_" + channelReduce.getChannelId(), channelReduce);
}
List<String> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
@@ -593,6 +610,18 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
return channel;
}
@Override
public List<PlatformCatalog> queryChannelInParentPlatformAndCatalog(String platformId, String catalogId) {
List<PlatformCatalog> catalogs = platformChannelMapper.queryChannelInParentPlatformAndCatalog(platformId, catalogId);
return catalogs;
}
@Override
public List<PlatformCatalog> queryStreamInParentPlatformAndCatalog(String platformId, String catalogId) {
List<PlatformCatalog> catalogs = platformGbStreamMapper.queryChannelInParentPlatformAndCatalogForCatlog(platformId, catalogId);
return catalogs;
}
@Override
public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) {
Device device = platformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
@@ -756,6 +785,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
if (parentPlatforms.size() > 0) {
for (ParentPlatform parentPlatform : parentPlatforms) {
streamPushItem.setCatalogId(parentPlatform.getCatalogId());
streamPushItem.setPlatformId(parentPlatform.getServerGBId());
String stream = streamPushItem.getStream();
StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(streamPushItem.getApp(), stream, parentPlatform.getServerGBId());
@@ -821,4 +851,74 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
return streamProxyMapper.selectOne(app, streamId);
}
@Override
public List<PlatformCatalog> getChildrenCatalogByPlatform(String platformId, String parentId) {
return catalogMapper.selectByParentId(platformId, parentId);
}
@Override
public int addCatalog(PlatformCatalog platformCatalog) {
return catalogMapper.add(platformCatalog);
}
@Override
public PlatformCatalog getCatalog(String id) {
return catalogMapper.select(id);
}
@Override
public int delCatalog(String id) {
PlatformCatalog platformCatalog = catalogMapper.select(id);
if (platformCatalog.getChildrenCount() > 0) {
List<PlatformCatalog> platformCatalogList = catalogMapper.selectByParentId(platformCatalog.getPlatformId(), platformCatalog.getId());
for (PlatformCatalog catalog : platformCatalogList) {
if (catalog.getChildrenCount() == 0) {
catalogMapper.del(catalog.getId());
platformGbStreamMapper.delByCatalogId(catalog.getId());
platformChannelMapper.delByCatalogId(catalog.getId());
}else {
delCatalog(catalog.getId());
}
}
}
int delresult = catalogMapper.del(id);
int delStreamresult = platformGbStreamMapper.delByCatalogId(id);
int delChanneresult = platformChannelMapper.delByCatalogId(id);
return delresult + delChanneresult + delStreamresult;
}
@Override
public int updateCatalog(PlatformCatalog platformCatalog) {
return catalogMapper.update(platformCatalog);
}
@Override
public int setDefaultCatalog(String platformId, String catalogId) {
return platformMapper.setDefaultCatalog(platformId, catalogId);
}
@Override
public List<PlatformCatalog> queryCatalogInPlatform(String platformId) {
return catalogMapper.selectByPlatForm(platformId);
}
@Override
public int delRelation(PlatformCatalog platformCatalog) {
if (platformCatalog.getType() == 1) {
return platformChannelMapper.delByCatalogIdAndChannelIdAndPlatformId(platformCatalog);
}else if (platformCatalog.getType() == 2) {
List<GbStream> gbStreams = platformGbStreamMapper.queryChannelInParentPlatformAndCatalog(platformCatalog.getPlatformId(), platformCatalog.getParentId());
for (GbStream gbStream : gbStreams) {
if (gbStream.getGbId().equals(platformCatalog.getId())) {
return platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream());
}
}
}
return 0;
}
@Override
public int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos) {
return gbStreamMapper.updateStreamGPS(gpsMsgInfos);
}
}

View File

@@ -153,12 +153,15 @@ public class DeviceQuery {
// 默认超时时间为30分钟
DeferredResult<ResponseEntity<Device>> result = new DeferredResult<ResponseEntity<Device>>(30*60*1000L);
result.onTimeout(()->{
logger.warn(String.format("设备通道信息同步超时"));
logger.warn("设备[{}]通道信息同步超时", deviceId);
// 释放rtpserver
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setId(uuid);
WVPResult<Object> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
wvpResult.setMsg("Timeout");
wvpResult.setCode(-1);
wvpResult.setData(device);
wvpResult.setMsg("更新超时");
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
@@ -170,8 +173,10 @@ public class DeviceQuery {
cmder.catalogQuery(device, event -> {
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setId(uuid);
WVPResult<Object> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
wvpResult.setCode(-1);
wvpResult.setData(device);
wvpResult.setMsg(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg));
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);

View File

@@ -82,7 +82,7 @@ public class GbStreamController {
@PostMapping(value = "/add")
@ResponseBody
public Object add(@RequestBody GbStreamParam gbStreamParam){
if (gbStreamService.addPlatformInfo(gbStreamParam.getGbStreams(), gbStreamParam.getPlatformId())) {
if (gbStreamService.addPlatformInfo(gbStreamParam.getGbStreams(), gbStreamParam.getPlatformId(), gbStreamParam.getCatalogId())) {
return "success";
}else {
return "fail";

View File

@@ -8,12 +8,22 @@ public class GbStreamParam {
private String platformId;
private String catalogId;
private List<GbStream> gbStreams;
public String getPlatformId() {
return platformId;
}
public String getCatalogId() {
return catalogId;
}
public void setCatalogId(String catalogId) {
this.catalogId = catalogId;
}
public void setPlatformId(String platformId) {
this.platformId = platformId;
}

View File

@@ -1,10 +1,15 @@
package com.genersoft.iot.vmp.vmanager.gb28181.platform;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.UpdateChannelParam;
import com.github.pagehelper.PageInfo;
@@ -21,6 +26,8 @@ import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import com.genersoft.iot.vmp.conf.SipConfig;
import java.util.List;
/**
* 级联平台管理
*/
@@ -253,7 +260,7 @@ public class PlatformController {
if (logger.isDebugEnabled()) {
logger.debug("给上级平台添加国标通道API调用");
}
int result = storager.updateChannelForGB(param.getPlatformId(), param.getChannelReduces());
int result = storager.updateChannelForGB(param.getPlatformId(), param.getChannelReduces(), param.getCatalogId());
return new ResponseEntity<>(String.valueOf(result > 0), HttpStatus.OK);
}
@@ -279,5 +286,197 @@ public class PlatformController {
return new ResponseEntity<>(String.valueOf(result > 0), HttpStatus.OK);
}
/**
* 获取目录
* @param platformId 平台ID
* @param parentId 目录父ID
* @return
*/
@ApiOperation("获取目录")
@ApiImplicitParams({
@ApiImplicitParam(name = "platformId", value = "平台ID", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "parentId", value = "目录父ID", dataTypeClass = String.class, required = true),
})
@GetMapping("/catalog")
@ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> getCatalogByPlatform(String platformId, String parentId){
if (logger.isDebugEnabled()) {
logger.debug("查询目录,platformId: {}, parentId: {}", platformId, parentId);
}
List<PlatformCatalog> platformCatalogList = storager.getChildrenCatalogByPlatform(platformId, parentId);
// 查询下属的国标通道
List<PlatformCatalog> catalogsForChannel = storager.queryChannelInParentPlatformAndCatalog(platformId, parentId);
List<PlatformCatalog> catalogsForStream = storager.queryStreamInParentPlatformAndCatalog(platformId, parentId);
platformCatalogList.addAll(catalogsForChannel);
platformCatalogList.addAll(catalogsForStream);
WVPResult<List<PlatformCatalog>> result = new WVPResult<>();
result.setCode(0);
result.setMsg("success");
result.setData(platformCatalogList);
return new ResponseEntity<>(result, HttpStatus.OK);
}
/**
* 添加目录
* @param platformCatalog 目录
* @return
*/
@ApiOperation("添加目录")
@ApiImplicitParams({
@ApiImplicitParam(name = "platformCatalog", value = "目录信息", dataTypeClass = PlatformCatalog.class, required = true),
})
@PostMapping("/catalog/add")
@ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> addCatalog(@RequestBody PlatformCatalog platformCatalog){
if (logger.isDebugEnabled()) {
logger.debug("添加目录,{}", JSON.toJSONString(platformCatalog));
}
PlatformCatalog platformCatalogInStore = storager.getCatalog(platformCatalog.getId());
WVPResult<List<PlatformCatalog>> result = new WVPResult<>();
if (platformCatalogInStore != null) {
result.setCode(-1);
result.setMsg( platformCatalog.getId() + " already exists");
return new ResponseEntity<>(result, HttpStatus.OK);
}
int addResult = storager.addCatalog(platformCatalog);
if (addResult > 0) {
result.setCode(0);
result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK);
}else {
result.setCode(-500);
result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK);
}
}
/**
* 编辑目录
* @param platformCatalog 目录
* @return
*/
@ApiOperation("编辑目录")
@ApiImplicitParams({
@ApiImplicitParam(name = "platformCatalog", value = "目录信息", dataTypeClass = PlatformCatalog.class, required = true),
})
@PostMapping("/catalog/edit")
@ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> editCatalog(@RequestBody PlatformCatalog platformCatalog){
if (logger.isDebugEnabled()) {
logger.debug("编辑目录,{}", JSON.toJSONString(platformCatalog));
}
PlatformCatalog platformCatalogInStore = storager.getCatalog(platformCatalog.getId());
WVPResult<List<PlatformCatalog>> result = new WVPResult<>();
result.setCode(0);
if (platformCatalogInStore == null) {
result.setMsg( platformCatalog.getId() + " not exists");
return new ResponseEntity<>(result, HttpStatus.OK);
}
int addResult = storager.updateCatalog(platformCatalog);
if (addResult > 0) {
result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK);
}else {
result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK);
}
}
/**
* 删除目录
* @param id 目录Id
* @return
*/
@ApiOperation("删除目录")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "目录Id", dataTypeClass = String.class, required = true),
})
@DeleteMapping("/catalog/del")
@ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> delCatalog(String id){
if (logger.isDebugEnabled()) {
logger.debug("删除目录,{}", id);
}
int delResult = storager.delCatalog(id);
WVPResult<List<PlatformCatalog>> result = new WVPResult<>();
result.setCode(0);
if (delResult > 0) {
result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK);
}else {
result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK);
}
}
/**
* 删除关联
* @param platformCatalog 关联的信息
* @return
*/
@ApiOperation("删除关联")
@ApiImplicitParams({
@ApiImplicitParam(name = "platformCatalog", value = "关联的信息", dataTypeClass = PlatformCatalog.class, required = true),
})
@DeleteMapping("/catalog/relation/del")
@ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> delRelation(@RequestBody PlatformCatalog platformCatalog){
if (logger.isDebugEnabled()) {
logger.debug("删除关联,{}", JSON.toJSONString(platformCatalog));
}
int delResult = storager.delRelation(platformCatalog);
WVPResult<List<PlatformCatalog>> result = new WVPResult<>();
result.setCode(0);
if (delResult > 0) {
result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK);
}else {
result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK);
}
}
/**
* 修改默认目录
* @param platformId 平台Id
* @param catalogId 目录Id
* @return
*/
@ApiOperation("修改默认目录")
@ApiImplicitParams({
@ApiImplicitParam(name = "platformId", value = "平台Id", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "catalogId", value = "目录Id", dataTypeClass = String.class, required = true),
})
@PostMapping("/catalog/default/update")
@ResponseBody
public ResponseEntity<WVPResult<String>> setDefaultCatalog(String platformId, String catalogId){
if (logger.isDebugEnabled()) {
logger.debug("修改默认目录,{},{}", platformId, catalogId);
}
int updateResult = storager.setDefaultCatalog(platformId, catalogId);
WVPResult<String> result = new WVPResult<>();
result.setCode(0);
if (updateResult > 0) {
result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK);
}else {
result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK);
}
}
}

View File

@@ -40,6 +40,11 @@ public class ChannelReduce {
*/
private String platformId;
/**
* 目录Id
*/
private String catalogId;
public String getChannelId() {
return channelId;
@@ -96,4 +101,12 @@ public class ChannelReduce {
public void setPlatformId(String platformId) {
this.platformId = platformId;
}
public String getCatalogId() {
return catalogId;
}
public void setCatalogId(String catalogId) {
this.catalogId = catalogId;
}
}

View File

@@ -4,6 +4,7 @@ import java.util.List;
public class UpdateChannelParam {
private String platformId;
private String catalogId;
private List<ChannelReduce> channelReduces;
public String getPlatformId() {
@@ -21,4 +22,12 @@ public class UpdateChannelParam {
public void setChannelReduces(List<ChannelReduce> channelReduces) {
this.channelReduces = channelReduces;
}
public String getCatalogId() {
return catalogId;
}
public void setCatalogId(String catalogId) {
this.catalogId = catalogId;
}
}