优化内存溢出的问题

优化zlm集群默认zlm的存储
添加报警的存储以及相关接口
添加单元测试
This commit is contained in:
648540858
2021-08-04 18:00:22 +08:00
parent 6560a7ad38
commit 724b288232
21 changed files with 17061 additions and 2298 deletions

View File

@@ -3,6 +3,11 @@ package com.genersoft.iot.vmp.gb28181.bean;
public class DeviceAlarm {
/**
* 数据库id
*/
private String id;
/**
* 设备Id
*/
@@ -45,6 +50,14 @@ public class DeviceAlarm {
private String alarmType;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
}

View File

@@ -9,6 +9,7 @@ import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*;
@@ -73,6 +74,9 @@ public class SIPProcessorFactory {
@Autowired
private SIPCommanderFroPlatform cmderFroPlatform;
@Autowired
private IDeviceAlarmService deviceAlarmService;
@Autowired
private RedisUtil redis;
@@ -178,6 +182,7 @@ public class SIPProcessorFactory {
processor.setOffLineDetector(offLineDetector);
processor.setCmder(cmder);
processor.setCmderFroPlatform(cmderFroPlatform);
processor.setDeviceAlarmService(deviceAlarmService);
processor.setStorager(storager);
processor.setRedisCatchStorage(redisCatchStorage);
return processor;

View File

@@ -1,11 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
@@ -107,7 +103,9 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void responseAck(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
public IRedisCatchStorage getRedisCatchStorage() {

View File

@@ -1,10 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.SipFactory;
import javax.sip.*;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
@@ -239,7 +236,9 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
Response response = null;
try {
response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
} catch (ParseException | SipException | InvalidArgumentException e) {
e.printStackTrace();
}
@@ -384,13 +383,17 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void responseAck(RequestEvent evt, int statusCode) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
private void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
response.setReasonPhrase(msg);
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
/**

View File

@@ -4,17 +4,12 @@ import java.io.ByteArrayInputStream;
import java.text.ParseException;
import java.util.*;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.InvalidArgumentException;
import javax.sip.ListeningPoint;
import javax.sip.ObjectInUseException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import javax.sip.message.Response;
@@ -35,6 +30,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcesso
import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.GpsUtil;
@@ -84,6 +80,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
private DeviceOffLineDetector offLineDetector;
private IDeviceAlarmService deviceAlarmService;
private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_";
private static final String MESSAGE_KEEP_ALIVE = "Keepalive";
@@ -738,7 +736,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
storager.insertMobilePosition(mobilePosition);
}
}
// TODO: 需要实现存储报警信息、报警分类
System.out.println("存储报警信息、报警分类");
// 存储报警信息、报警分类
deviceAlarmService.add(deviceAlarm);
if (offLineDetector.isOnline(deviceId)) {
publisher.deviceAlarmEventPublish(deviceAlarm);
@@ -779,7 +779,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}else{
logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备不存在, 回复404");
Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
// if (device != null && device.getOnline() == 1) {
@@ -987,7 +989,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void responseAck(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
/***
@@ -999,7 +1003,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
private Element getRootElement(RequestEvent evt) throws DocumentException {
@@ -1049,4 +1055,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
public void setCmderFroPlatform(SIPCommanderFroPlatform cmderFroPlatform) {
this.cmderFroPlatform = cmderFroPlatform;
}
public void setDeviceAlarmService(IDeviceAlarmService deviceAlarmService) {
this.deviceAlarmService = deviceAlarmService;
}
}

View File

@@ -6,6 +6,7 @@ import java.util.Iterator;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.message.Request;
import javax.sip.message.Response;
@@ -342,7 +343,9 @@ public class NotifyRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void response200Ok(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
private Element getRootElement(RequestEvent evt) throws DocumentException {

View File

@@ -7,6 +7,7 @@ import java.util.Locale;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.AuthorizationHeader;
import javax.sip.header.ContactHeader;
@@ -112,7 +113,9 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
if (expiresHeader == null) {
response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
return;
}
// 添加Contact头
@@ -159,7 +162,9 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
}
}
getServerTransaction(evt).sendResponse(response);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
// 注册成功
// 保存到redis
// 下发catelog查询目录

View File

@@ -27,9 +27,6 @@ public class SubscribeRequestProcessor extends SIPRequestAbstractProcessor {
* 处理SUBSCRIBE请求
*
* @param evt
* @param layer
* @param transaction
* @param config
*/
@Override
public void process(RequestEvent evt) {
@@ -46,6 +43,7 @@ public class SubscribeRequestProcessor extends SIPRequestAbstractProcessor {
ServerTransaction transaction = getServerTransaction(evt);
if (transaction != null) {
transaction.sendResponse(response);
transaction.getDialog().delete();
transaction.terminate();
} else {
logger.info("processRequest serverTransactionId is null.");

View File

@@ -21,13 +21,11 @@ public class ByeResponseProcessor implements ISIPResponseProcessor {
*
* @param evt
* @param layer
* @param transaction
* @param config
*/
@Override
public void process(ResponseEvent evt, SipLayer layer, SipConfig config) {
// TODO Auto-generated method stub
System.out.println( );
}
}