设备信息增加最近注册时间和最近心跳时间,心跳超时时间变为可配置

This commit is contained in:
64850858
2021-06-07 15:11:53 +08:00
parent 641d7d8e42
commit 83411ad127
16 changed files with 130 additions and 70 deletions

View File

@@ -23,4 +23,5 @@ public class VManageBootstrap extends LogManager {
VManageBootstrap.context = SpringApplication.run(VManageBootstrap.class, args);
}
}

View File

@@ -39,7 +39,9 @@ public class VideoManagerConstants {
public static final String EVENT_ONLINE_REGISTER = "1";
public static final String EVENT_ONLINE_KEEPLIVE = "2";
public static final String EVENT_ONLINE_MESSAGE = "3";
public static final String EVENT_OUTLINE_UNREGISTER = "1";
public static final String EVENT_OUTLINE_TIMEOUT = "2";

View File

@@ -31,6 +31,9 @@ public class SipConfig {
@Value("${sip.ptz.speed:50}")
Integer speed;
@Value("${sip.keepaliveTimeOut:180}")
Integer keepaliveTimeOut;
public String getMonitorIp() {
return monitorIp;
}
@@ -63,4 +66,7 @@ public class SipConfig {
return speed;
}
public Integer getKeepaliveTimeOut() {
return keepaliveTimeOut;
}
}

View File

@@ -66,19 +66,24 @@ public class Device {
/**
* 注册时间
*/
private Long registerTimeMillis;
private String registerTime;
/**
* 心跳时间
*/
private Long KeepaliveTimeMillis;
private String keepaliveTime;
/**
* 通道个数
*/
private int channelCount;
/**
* 注册有效期
*/
private int expires;
public String getDeviceId() {
return deviceId;
}
@@ -175,19 +180,27 @@ public class Device {
this.channelCount = channelCount;
}
public Long getRegisterTimeMillis() {
return registerTimeMillis;
public String getRegisterTime() {
return registerTime;
}
public void setRegisterTimeMillis(Long registerTimeMillis) {
this.registerTimeMillis = registerTimeMillis;
public void setRegisterTime(String registerTime) {
this.registerTime = registerTime;
}
public Long getKeepaliveTimeMillis() {
return KeepaliveTimeMillis;
public String getKeepaliveTime() {
return keepaliveTime;
}
public void setKeepaliveTimeMillis(Long keepaliveTimeMillis) {
KeepaliveTimeMillis = keepaliveTimeMillis;
public void setKeepaliveTime(String keepaliveTime) {
this.keepaliveTime = keepaliveTime;
}
public int getExpires() {
return expires;
}
public void setExpires(int expires) {
this.expires = expires;
}
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent;
import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
import org.springframework.beans.factory.annotation.Autowired;
@@ -22,9 +23,9 @@ public class EventPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void onlineEventPublish(String deviceId, String from) {
public void onlineEventPublish(Device device, String from) {
OnlineEvent onEvent = new OnlineEvent(this);
onEvent.setDeviceId(deviceId);
onEvent.setDevice(device);
onEvent.setFrom(from);
applicationEventPublisher.publishEvent(onEvent);
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event.online;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import org.springframework.context.ApplicationEvent;
/**
@@ -18,18 +19,18 @@ public class OnlineEvent extends ApplicationEvent {
super(source);
}
private String deviceId;
private Device device;
private String from;
public String getDeviceId() {
return deviceId;
public Device getDevice() {
return device;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
public void setDevice(Device device) {
this.device = device;
}
public String getFrom() {
return from;
}

View File

@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.event.online;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -10,6 +12,9 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description: 在线事件监听器,监听到离线后,修改设备离在线状态。 设备在线有两个来源:
* 1、设备主动注销发送注销指令{@link com.genersoft.iot.vmp.gb28181.transmit.request.impl.RegisterRequestProcessor}
@@ -28,39 +33,46 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
@Autowired
private RedisUtil redis;
@Autowired
private SipConfig sipConfig;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void onApplicationEvent(OnlineEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("设备上线事件触发deviceId" + event.getDeviceId() + ",from:" + event.getFrom());
logger.debug("设备上线事件触发deviceId" + event.getDevice().getDeviceId() + ",from:" + event.getFrom());
}
String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + event.getDeviceId();
boolean needUpdateStorager = false;
Device device = event.getDevice();
String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + event.getDevice().getDeviceId();
switch (event.getFrom()) {
// 注册时触发的在线事件先在redis中增加超时超时监听
case VideoManagerConstants.EVENT_ONLINE_REGISTER:
// TODO 超时时间暂时写死为180秒
redis.set(key, event.getDeviceId(), 180);
needUpdateStorager = true;
// 超时时间
redis.set(key, event.getDevice().getDeviceId(), sipConfig.getKeepaliveTimeOut());
device.setRegisterTime(format.format(new Date(System.currentTimeMillis())));
break;
// 设备主动发送心跳触发的线事件
// 设备主动发送心跳触发的线事件
case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE:
boolean exist = redis.hasKey(key);
// 先判断是否还存在当设备先心跳超时后又发送心跳时redis没有监听需要增加
if (!exist) {
needUpdateStorager = true;
redis.set(key, event.getDeviceId(), 180);
redis.set(key, event.getDevice().getDeviceId(), sipConfig.getKeepaliveTimeOut());
} else {
redis.expire(key, 180);
redis.expire(key, sipConfig.getKeepaliveTimeOut());
}
device.setKeepaliveTime(format.format(new Date(System.currentTimeMillis())));
break;
// 设备主动发送消息触发的在线事件
case VideoManagerConstants.EVENT_ONLINE_MESSAGE:
break;
}
if (needUpdateStorager) {
// 处理线监听
storager.online(event.getDeviceId());
}
device.setOnline(1);
// 处理线监听
storager.updateDevice(device);
}
}

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import java.io.ByteArrayInputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import javax.sip.address.SipURI;
@@ -226,7 +227,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
String name = rootElement.getName();
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText();
Device device = storager.queryVideoDevice(deviceId);
if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应还是Query——查询请求
logger.info("接收到DeviceStatus查询消息");
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -259,7 +260,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
deferredResultHolder.invokeResult(msg);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
} else {
}
}
@@ -452,6 +453,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
String requestName = rootElement.getName();
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getTextTrim().toString();
Device device = storager.queryVideoDevice(deviceId);
if (requestName.equals("Query")) {
logger.info("接收到DeviceInfo查询消息");
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -468,7 +470,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}
} else {
logger.debug("接收到DeviceInfo应答消息");
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
}
@@ -489,7 +490,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 回复200 OK
responseAck(evt);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
}
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
@@ -669,7 +670,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 回复200 OK
responseAck(evt);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
}
}
}
@@ -776,7 +777,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 回复200 OK
responseAck(evt);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
} else {
}
}else {

View File

@@ -216,13 +216,13 @@ public class NotifyRequestProcessor extends SIPRequestAbstractProcessor {
Element rootElement = getRootElement(evt);
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText();
Device device = storager.queryVideoDevice(deviceId);
Element deviceListElement = rootElement.element("DeviceList");
if (deviceListElement == null) {
return;
}
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
}
@@ -324,7 +324,7 @@ public class NotifyRequestProcessor extends SIPRequestAbstractProcessor {
// 回复200 OK
response200Ok(evt);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
}
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {

View File

@@ -2,7 +2,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;
import javax.sip.InvalidArgumentException;
@@ -70,7 +73,11 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
boolean passwordCorrect = false;
// 注册标志 0未携带授权头或者密码错误 1注册成功 2注销成功
int registerFlag = 0;
Device device = null;
FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
String deviceId = uri.getUser();
Device device = storager.queryVideoDevice(deviceId);
AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
// 校验密码是否正确
if (authorhead != null) {
@@ -103,13 +110,17 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
response.addHeader(dateHeader);
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
if (expiresHeader == null) {
response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
getServerTransaction(evt).sendResponse(response);
return;
}
// 添加Contact头
response.addHeader(request.getHeader(ContactHeader.NAME));
// 添加Expires头
response.addHeader(request.getExpires());
// 获取到通信地址等信息
FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME);
ViaHeader viaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
String received = viaHeader.getReceived();
int rPort = viaHeader.getRPort();
@@ -119,10 +130,7 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
rPort = viaHeader.getPort();
}
//
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
String deviceId = uri.getUser();
device = storager.queryVideoDevice(deviceId);
if (device == null) {
device = new Device();
device.setStreamMode("UDP");
@@ -132,11 +140,12 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
device.setPort(rPort);
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
// 注销成功
if (expiresHeader != null && expiresHeader.getExpires() == 0) {
if (expiresHeader.getExpires() == 0) {
registerFlag = 2;
}
// 注册成功
else {
device.setExpires(expiresHeader.getExpires());
registerFlag = 1;
// 判断TCP还是UDP
boolean isTcp = false;
@@ -154,10 +163,7 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
// 下发catelog查询目录
if (registerFlag == 1 ) {
logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
device.setRegisterTimeMillis(System.currentTimeMillis());
storager.updateDevice(device);
publisher.onlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_ONLINE_REGISTER);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
// 重新注册更新设备和通道,以免设备替换或更新后信息无法更新
handler.onRegister(device);
} else if (registerFlag == 2) {

View File

@@ -27,6 +27,9 @@ public interface DeviceMapper {
"ip," +
"port," +
"hostAddress," +
"expires," +
"registerTime," +
"keepaliveTime," +
"online" +
") VALUES (" +
"#{deviceId}," +
@@ -39,6 +42,9 @@ public interface DeviceMapper {
"#{ip}," +
"#{port}," +
"#{hostAddress}," +
"#{expires}," +
"#{registerTime}," +
"#{keepaliveTime}," +
"#{online}" +
")")
int add(Device device);
@@ -56,6 +62,9 @@ public interface DeviceMapper {
"<if test=\"port != null\">, port=${port}</if>" +
"<if test=\"hostAddress != null\">, hostAddress='${hostAddress}'</if>" +
"<if test=\"online != null\">, online=${online}</if>" +
"<if test=\"registerTime != null\">, registerTime='${registerTime}'</if>" +
"<if test=\"keepaliveTime != null\">, keepaliveTime='${keepaliveTime}'</if>" +
"<if test=\"expires != null\">, expires=${expires}</if>" +
"WHERE deviceId='${deviceId}'"+
" </script>"})
int update(Device device);