修复订阅sip消息导致的内存溢出以及录像回放的问题
This commit is contained in:
@@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.sip.*;
|
||||
import javax.sip.header.CallIdHeader;
|
||||
import javax.sip.header.Header;
|
||||
import javax.sip.message.Response;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
||||
@@ -168,7 +169,8 @@ public class SipLayer implements SipListener {
|
||||
if (callIdHeader != null) {
|
||||
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
|
||||
if (subscribe != null) {
|
||||
subscribe.response(evt);
|
||||
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(evt);
|
||||
subscribe.response(eventResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -181,7 +183,8 @@ public class SipLayer implements SipListener {
|
||||
if (callIdHeader != null) {
|
||||
SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
|
||||
if (subscribe != null) {
|
||||
subscribe.response(evt);
|
||||
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(evt);
|
||||
subscribe.response(eventResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -204,7 +207,11 @@ public class SipLayer implements SipListener {
|
||||
@Override
|
||||
public void processTimeout(TimeoutEvent timeoutEvent) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
CallIdHeader callIdHeader = timeoutEvent.getClientTransaction().getDialog().getCallId();
|
||||
String callId = callIdHeader.getCallId();
|
||||
SipSubscribe.Event errorSubscribe = sipSubscribe.getErrorSubscribe(callId);
|
||||
SipSubscribe.EventResult<TimeoutEvent> timeoutEventEventResult = new SipSubscribe.EventResult<>(timeoutEvent);
|
||||
errorSubscribe.response(timeoutEventEventResult);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -220,6 +227,7 @@ public class SipLayer implements SipListener {
|
||||
@Override
|
||||
public void processIOException(IOExceptionEvent exceptionEvent) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -235,6 +243,11 @@ public class SipLayer implements SipListener {
|
||||
@Override
|
||||
public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
|
||||
// TODO Auto-generated method stub
|
||||
// CallIdHeader callIdHeader = transactionTerminatedEvent.getClientTransaction().getDialog().getCallId();
|
||||
// String callId = callIdHeader.getCallId();
|
||||
// SipSubscribe.Event errorSubscribe = sipSubscribe.getErrorSubscribe(callId);
|
||||
// SipSubscribe.EventResult<TransactionTerminatedEvent> eventResult = new SipSubscribe.EventResult<>(transactionTerminatedEvent);
|
||||
// errorSubscribe.response(eventResult);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -250,6 +263,11 @@ public class SipLayer implements SipListener {
|
||||
@Override
|
||||
public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
|
||||
// TODO Auto-generated method stub
|
||||
// CallIdHeader callIdHeader = dialogTerminatedEvent.getDialog().getCallId();
|
||||
// String callId = callIdHeader.getCallId();
|
||||
// SipSubscribe.Event errorSubscribe = sipSubscribe.getErrorSubscribe(callId);
|
||||
// SipSubscribe.EventResult<DialogTerminatedEvent> eventResult = new SipSubscribe.EventResult<>(dialogTerminatedEvent);
|
||||
// errorSubscribe.response(eventResult);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1,38 +1,121 @@
|
||||
package com.genersoft.iot.vmp.gb28181.event;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.ResponseEvent;
|
||||
import javax.sip.*;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Component
|
||||
public class SipSubscribe {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(SipSubscribe.class);
|
||||
|
||||
private Map<String, SipSubscribe.Event> errorSubscribes = new ConcurrentHashMap<>();
|
||||
|
||||
private Map<String, SipSubscribe.Event> okSubscribes = new ConcurrentHashMap<>();
|
||||
|
||||
private Map<String, Date> timeSubscribes = new ConcurrentHashMap<>();
|
||||
|
||||
// @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次
|
||||
@Scheduled(cron="0 * */1 * * ?") //每小时执行一次
|
||||
public void execute(){
|
||||
logger.info("[定时任务] 清理过期的订阅信息");
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.setTime(new Date());
|
||||
calendar.set(Calendar.HOUR, calendar.get(Calendar.HOUR) - 1);
|
||||
for (String key : timeSubscribes.keySet()) {
|
||||
if (timeSubscribes.get(key).before(calendar.getTime())){
|
||||
logger.info("[定时任务] 清理过期的订阅信息: {}", key);
|
||||
errorSubscribes.remove(key);
|
||||
okSubscribes.remove(key);
|
||||
timeSubscribes.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface Event {
|
||||
void response(ResponseEvent event);
|
||||
void response(EventResult eventResult);
|
||||
}
|
||||
|
||||
public static class EventResult<EventObject>{
|
||||
public int statusCode;
|
||||
public String type;
|
||||
public String msg;
|
||||
public String callId;
|
||||
public Dialog dialog;
|
||||
public EventObject event;
|
||||
|
||||
public EventResult() {
|
||||
}
|
||||
|
||||
public EventResult(EventObject event) {
|
||||
this.event = event;
|
||||
if (event instanceof ResponseEvent) {
|
||||
ResponseEvent responseEvent = (ResponseEvent)event;
|
||||
this.type = "response";
|
||||
this.msg = responseEvent.getResponse().getReasonPhrase();
|
||||
this.statusCode = responseEvent.getResponse().getStatusCode();
|
||||
this.callId = responseEvent.getDialog().getCallId().getCallId();
|
||||
this.dialog = responseEvent.getDialog();
|
||||
}else if (event instanceof TimeoutEvent) {
|
||||
TimeoutEvent timeoutEvent = (TimeoutEvent)event;
|
||||
this.type = "timeout";
|
||||
this.msg = "消息超时未回复";
|
||||
this.statusCode = -1024;
|
||||
this.callId = timeoutEvent.getClientTransaction().getDialog().getCallId().getCallId();
|
||||
this.dialog = timeoutEvent.getClientTransaction().getDialog();
|
||||
}else if (event instanceof TransactionTerminatedEvent) {
|
||||
TransactionTerminatedEvent transactionTerminatedEvent = (TransactionTerminatedEvent)event;
|
||||
this.type = "transactionTerminated";
|
||||
this.msg = "事务已结束";
|
||||
this.statusCode = -1024;
|
||||
this.callId = transactionTerminatedEvent.getClientTransaction().getDialog().getCallId().getCallId();
|
||||
this.dialog = transactionTerminatedEvent.getClientTransaction().getDialog();
|
||||
}else if (event instanceof DialogTerminatedEvent) {
|
||||
DialogTerminatedEvent dialogTerminatedEvent = (DialogTerminatedEvent)event;
|
||||
this.type = "dialogTerminated";
|
||||
this.msg = "会话已结束";
|
||||
this.statusCode = -1024;
|
||||
this.callId = dialogTerminatedEvent.getDialog().getCallId().getCallId();
|
||||
this.dialog = dialogTerminatedEvent.getDialog();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void addErrorSubscribe(String key, SipSubscribe.Event event) {
|
||||
errorSubscribes.put(key, event);
|
||||
timeSubscribes.put(key, new Date());
|
||||
}
|
||||
|
||||
public void addOkSubscribe(String key, SipSubscribe.Event event) {
|
||||
okSubscribes.put(key, event);
|
||||
timeSubscribes.put(key, new Date());
|
||||
}
|
||||
|
||||
public SipSubscribe.Event getErrorSubscribe(String key) {
|
||||
return errorSubscribes.get(key);
|
||||
}
|
||||
|
||||
public void removeErrorSubscribe(String key) {
|
||||
errorSubscribes.remove(key);
|
||||
timeSubscribes.remove(key);
|
||||
}
|
||||
|
||||
public SipSubscribe.Event getOkSubscribe(String key) {
|
||||
return okSubscribes.get(key);
|
||||
}
|
||||
|
||||
public void removeOkSubscribe(String key) {
|
||||
okSubscribes.remove(key);
|
||||
timeSubscribes.remove(key);
|
||||
}
|
||||
public int getErrorSubscribesSize(){
|
||||
return errorSubscribes.size();
|
||||
}
|
||||
|
||||
@@ -75,8 +75,8 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener<P
|
||||
redisCatchStorage.updatePlatformKeepalive(parentPlatform);
|
||||
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
|
||||
|
||||
sipSubscribe.addOkSubscribe(callId, (ResponseEvent responseEvent) ->{
|
||||
if (responseEvent.getResponse().getStatusCode() == Response.OK) {
|
||||
sipSubscribe.addOkSubscribe(callId, (SipSubscribe.EventResult eventResult) ->{
|
||||
if (eventResult.statusCode == Response.OK) {
|
||||
// 收到心跳响应信息,
|
||||
parentPlatformCatch.setKeepAliveReply(0);
|
||||
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
|
||||
|
||||
@@ -427,8 +427,8 @@ public class SIPCommander implements ISIPCommander {
|
||||
mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
|
||||
errorEvent.response(e);
|
||||
}), e ->{
|
||||
streamSession.put(device.getDeviceId(), channelId ,ssrcInfo.getSsrc(), finalStreamId, mediaServerItem.getId(),e.getClientTransaction());
|
||||
streamSession.put(device.getDeviceId(), channelId , e.getDialog());
|
||||
streamSession.put(device.getDeviceId(), channelId ,ssrcInfo.getSsrc(), finalStreamId, mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction());
|
||||
streamSession.put(device.getDeviceId(), channelId , e.dialog);
|
||||
});
|
||||
|
||||
|
||||
@@ -535,9 +535,9 @@ public class SIPCommander implements ISIPCommander {
|
||||
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
|
||||
|
||||
transmitRequest(device, request, errorEvent, okEvent -> {
|
||||
Dialog dialog = okEvent.getClientTransaction().getDialog();
|
||||
streamSession.put(device.getDeviceId(), channelId, ssrcInfo.getSsrc(), ssrcInfo.getStreamId(), mediaServerItem.getId(), okEvent.getClientTransaction());
|
||||
streamSession.put(device.getDeviceId(), channelId, dialog);
|
||||
ResponseEvent responseEvent = (ResponseEvent) okEvent.event;
|
||||
streamSession.put(device.getDeviceId(), channelId, ssrcInfo.getSsrc(), ssrcInfo.getStreamId(), mediaServerItem.getId(), responseEvent.getClientTransaction());
|
||||
streamSession.put(device.getDeviceId(), channelId, okEvent.dialog);
|
||||
});
|
||||
} catch ( SipException | ParseException | InvalidArgumentException e) {
|
||||
e.printStackTrace();
|
||||
@@ -667,6 +667,10 @@ public class SIPCommander implements ISIPCommander {
|
||||
ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId);
|
||||
if (transaction == null) {
|
||||
logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
|
||||
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
|
||||
if (okEvent != null) {
|
||||
okEvent.response(eventResult);
|
||||
}
|
||||
return;
|
||||
}
|
||||
SIPDialog dialog = streamSession.getDialog(deviceId, channelId);
|
||||
@@ -1506,11 +1510,17 @@ public class SIPCommander implements ISIPCommander {
|
||||
CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
|
||||
// 添加错误订阅
|
||||
if (errorEvent != null) {
|
||||
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), errorEvent);
|
||||
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> {
|
||||
errorEvent.response(eventResult);
|
||||
sipSubscribe.removeErrorSubscribe(eventResult.callId);
|
||||
}));
|
||||
}
|
||||
// 添加订阅
|
||||
if (okEvent != null) {
|
||||
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
|
||||
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult ->{
|
||||
okEvent.response(eventResult);
|
||||
sipSubscribe.removeOkSubscribe(eventResult.callId);
|
||||
});
|
||||
}
|
||||
|
||||
clientTransaction.sendRequest();
|
||||
|
||||
@@ -100,7 +100,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
|
||||
if (event != null) {
|
||||
logger.info("向上级平台 [ {} ] 注册发上错误: {} ",
|
||||
parentPlatform.getServerGBId(),
|
||||
event.getResponse().getReasonPhrase());
|
||||
event.msg);
|
||||
}
|
||||
if (errorEvent != null ) {
|
||||
errorEvent.response(event);
|
||||
|
||||
@@ -235,7 +235,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
|
||||
// 未知错误。直接转发设备点播的错误
|
||||
Response response = null;
|
||||
try {
|
||||
response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
|
||||
response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
|
||||
ServerTransaction serverTransaction = getServerTransaction(evt);
|
||||
serverTransaction.sendResponse(response);
|
||||
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
|
||||
|
||||
@@ -806,7 +806,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
}
|
||||
}
|
||||
System.out.println("存储报警信息、报警分类");
|
||||
logger.debug("存储报警信息、报警分类");
|
||||
// 存储报警信息、报警分类
|
||||
deviceAlarmService.add(deviceAlarm);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user