@@ -3,9 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify
import com.genersoft.iot.vmp.common.VideoManagerConstants ;
import com.genersoft.iot.vmp.conf.DynamicTask ;
import com.genersoft.iot.vmp.conf.UserSetting ;
import com.genersoft.iot.vmp.gb28181.bean.Device ;
import com.genersoft.iot.vmp.gb28181.bean.Platform ;
import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo ;
import com.genersoft.iot.vmp.gb28181.bean.* ;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent ;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler ;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler ;
@@ -18,6 +16,7 @@ import org.apache.commons.lang3.ObjectUtils;
import org.dom4j.Element ;
import org.springframework.beans.factory.InitializingBean ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.scheduling.annotation.Scheduled ;
import org.springframework.stereotype.Component ;
import javax.sip.InvalidArgumentException ;
@@ -25,6 +24,9 @@ import javax.sip.RequestEvent;
import javax.sip.SipException ;
import javax.sip.message.Response ;
import java.text.ParseException ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.concurrent.ConcurrentLinkedQueue ;
/**
* 状态信息(心跳)报送
@@ -36,6 +38,8 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
private final static String cmdType = " Keepalive " ;
private final ConcurrentLinkedQueue < SipMsgInfo > taskQueue = new ConcurrentLinkedQueue < > ( ) ;
@Autowired
private NotifyMessageHandler notifyMessageHandler ;
@@ -54,68 +58,89 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
}
@Override
public void handForDevice ( RequestEvent evt , Device device , Element e lement) {
if ( device = = null ) {
// 未注册的设备不做处理
public void handForDevice ( RequestEvent evt , Device device , Element rootE lement) {
if ( taskQueue . size ( ) > = userSetting . getMaxNotifyCountQueue ( ) ) {
log . error ( " [心跳] 待处理消息队列已满 {}, 返回486 BUSY_HERE, 消息不做处理 " , userSetting . getMaxNotifyCountQueue ( ) ) ;
return ;
}
SIPRequest request = ( SIPReques t) evt . getReques t ( ) ;
log . debug ( " [收到心跳] device: {}, callId: {} " , device . getDeviceId ( ) , request . getCallIdHeader ( ) . getCallId ( ) ) ;
if ( userSetting . getGbDeviceOnline ( ) = = 0 & & ! device . isOnLine ( ) ) {
log . warn ( " [收到心跳] 设备离线,心跳不进行回复, device: {}, callId: {} " , device . getDeviceId ( ) , request . getCallIdHeader ( ) . getCallId ( ) ) ;
taskQueue . offer ( new SipMsgInfo ( ev t, d evice , rootElemen t) ) ;
}
@Scheduled ( fixedDelay = 100 )
public void executeTaskQueue ( ) {
if ( taskQueue . isEmpty ( ) ) {
return ;
}
// 回复200 OK
try {
responseAck ( request , Response . OK ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] 心跳回复: {} " , e . getMessage ( ) ) ;
List < SipMsgInfo > handlerCatchDataList = new ArrayList < > ( ) ;
int size = taskQueue . size ( ) ;
for ( int i = 0 ; i < size ; i + + ) {
SipMsgInfo poll = taskQueue . poll ( ) ;
if ( poll ! = null ) {
handlerCatchDataList . add ( poll ) ;
}
}
if ( ! ObjectUtils . isEmpty ( device . getKeepaliveTime ( ) ) & & DateUtil . getDifferenceForNow ( device . getKeepaliveTime ( ) ) < = 3000L ) {
log . info ( " [收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {} " , device . getDeviceId ( ) , request . getCallIdHeader ( ) . getCallId ( ) ) ;
if ( handlerCatchDataList . isEmpty ( ) ) {
return ;
}
RemoteAddressInfo remoteAddressInfo = SipUtils . getRemoteAddressFromRequest ( request , userSetting . getSipUseSourceIpAsRemoteAddress ( ) ) ;
if ( ! device . getIp ( ) . equalsIgnoreCase ( remoteAddressInfo . getIp ( ) ) | | device . getPort ( ) ! = remoteAddressInfo . getPort ( ) ) {
log . info ( " [收到心跳] 设备{}地址变化, {}:{}->{} " , device . getDeviceId ( ) , remoteAddressInfo . getIp ( ) , remoteAddressInfo . getPort ( ) , request . getLocalAddress ( ) . getHostAddress ( ) ) ;
device . setPort ( remoteAddressInfo . getPort ( ) ) ;
device . setHostAddress ( remoteAddressInfo . getIp ( ) . concat ( " : " ) . concat ( String . valueOf ( remoteAddressInfo . getPort ( ) ) ) ) ;
device . setIp ( remoteAddressInfo . getIp ( ) ) ;
device . setLocalIp ( request . getLocalAddress ( ) . getHostAddress ( ) ) ;
// 设备地址变化会引起目录订阅任务失效,需要重新添加
if ( device . getSubscribeCycleForCatalog ( ) > 0 ) {
deviceService . removeCatalogSubscribe ( device , result - > {
deviceService . addCatalogSubscribe ( device ) ;
} ) ;
for ( SipMsgInfo sipMsgInfo : handlerCatchDataList ) {
if ( sipMsgInfo = = null ) {
continue ;
}
}
if ( device . getKeepaliveTime ( ) = = null ) {
device . setKeepaliveIntervalTime ( 60 ) ;
} else {
long lastTime = DateUtil . yyyy_MM_dd_HH_mm_ssToTimestamp ( device . getKeepaliveTime ( ) ) ;
if ( System . currentTimeMillis ( ) / 1000 - lastTime > 10 ) {
device . setKeepaliveIntervalTime ( Long . valueOf ( System . currentTimeMillis ( ) / 1000 - lastTime ) . intValue ( ) ) ;
RequestEvent evt = sipMsgInfo . getEvt ( ) ;
// 回复200 OK
try {
responseAck ( ( SIPRequest ) evt . getRequest ( ) , Response . OK ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] 心跳回复: {} " , e . getMessage ( ) ) ;
}
}
device . s etKeepaliveTime( DateUtil . getNow ( ) ) ;
if ( device . isOnLine ( ) ) {
deviceService . updateDevice ( device ) ;
} else {
if ( userSetting . getGbDeviceOnline ( ) = = 1 ) {
// 对于已经离线的设备判断他的注册是否已经过期
device . setOnLine ( true ) ;
device . setRegisterTime ( DateUtil . getNow ( ) ) ;
deviceService . online ( device , null ) ;
Device device = sipMsgInfo . getDevice ( ) ;
SIPRequest request = ( SIPRequest ) evt . getRequest ( ) ;
if ( ! ObjectUtils . isEmpty ( device . g etKeepaliveTime( ) ) & & DateUtil . getDifferenceForNow ( device . getKeepaliveTime ( ) ) < = 3000L ) {
log . info ( " [收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {} " , device . getDeviceId ( ) , request . getCallIdHeader ( ) . getCallId ( ) ) ;
return ;
}
}
// 刷新过期任务
String registerExpireTaskKey = VideoManagerConstants . REGISTER_EXPIRE_TASK_KEY_PREFIX + device . getDeviceId ( ) ;
// 如果三次心跳失败,则设置设备离线
dynamicTask . startDelay ( registerExpireTaskKey , ( ) - > deviceService . offline ( device . getDeviceId ( ) , " 三次心跳失败 " ) , device . getKeepaliveIntervalTime ( ) * 1000 * 3 ) ;
RemoteAddressInfo remoteAddressInfo = SipUtils . getRemoteAddressFromRequest ( request , userSetting . getSipUseSourceIpAsRemoteAddress ( ) ) ;
if ( ! device . getIp ( ) . equalsIgnoreCase ( remoteAddressInfo . getIp ( ) ) | | device . getPort ( ) ! = remoteAddressInfo . getPort ( ) ) {
log . info ( " [收到心跳] 地址变化, {}({}), {}:{}->{} " , device . getName ( ) , device . getDeviceId ( ) , remoteAddressInfo . getIp ( ) , remoteAddressInfo . getPort ( ) , request . getLocalAddress ( ) . getHostAddress ( ) ) ;
device . setPort ( remoteAddressInfo . getPort ( ) ) ;
device . setHostAddress ( remoteAddressInfo . getIp ( ) . concat ( " : " ) . concat ( String . valueOf ( remoteAddressInfo . getPort ( ) ) ) ) ;
device . setIp ( remoteAddressInfo . getIp ( ) ) ;
device . setLocalIp ( request . getLocalAddress ( ) . getHostAddress ( ) ) ;
// 设备地址变化会引起目录订阅任务失效,需要重新添加
if ( device . getSubscribeCycleForCatalog ( ) > 0 ) {
deviceService . removeCatalogSubscribe ( device , result - > {
deviceService . addCatalogSubscribe ( device ) ;
} ) ;
}
}
if ( device . getKeepaliveTime ( ) = = null ) {
device . setKeepaliveIntervalTime ( 60 ) ;
} else {
long lastTime = DateUtil . yyyy_MM_dd_HH_mm_ssToTimestamp ( device . getKeepaliveTime ( ) ) ;
if ( System . currentTimeMillis ( ) / 1000 - lastTime > 10 ) {
device . setKeepaliveIntervalTime ( Long . valueOf ( System . currentTimeMillis ( ) / 1000 - lastTime ) . intValue ( ) ) ;
}
}
device . setKeepaliveTime ( DateUtil . getNow ( ) ) ;
if ( device . isOnLine ( ) ) {
deviceService . updateDevice ( device ) ;
} else {
if ( userSetting . getGbDeviceOnline ( ) = = 1 ) {
// 对于已经离线的设备判断他的注册是否已经过期
device . setOnLine ( true ) ;
device . setRegisterTime ( DateUtil . getNow ( ) ) ;
deviceService . online ( device , null ) ;
}
}
// 刷新过期任务
String registerExpireTaskKey = VideoManagerConstants . REGISTER_EXPIRE_TASK_KEY_PREFIX + device . getDeviceId ( ) ;
// 如果三次心跳失败,则设置设备离线
dynamicTask . startDelay ( registerExpireTaskKey , ( ) - > deviceService . offline ( device . getDeviceId ( ) , " 三次心跳失败 " ) , device . getKeepaliveIntervalTime ( ) * 1000 * 3 ) ;
}
}
@Override