存储部分使用sqlite代替redis

This commit is contained in:
panlinlin
2020-12-31 13:15:50 +08:00
parent 00e61d9a80
commit 3d83775468
30 changed files with 933 additions and 956 deletions

View File

@@ -138,16 +138,25 @@ public class SipLayer implements SipListener {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (evt.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
if (subscribe != null) {
subscribe.response(evt);
}
}
}
// } else if (status == Response.TRYING) {
// trying不会回复
} else if ((status >= 100) && (status < 200)) {
// 增加其它无需回复的响应如101、180等
} else {
logger.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
if (evt.getResponse() != null && sipSubscribe.getSize() > 0 ) {
if (evt.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
SipSubscribe.Event subscribe = sipSubscribe.getSubscribe(callIdHeader.getCallId());
SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
if (subscribe != null) {
subscribe.response(evt);
}

View File

@@ -1,11 +1,17 @@
package com.genersoft.iot.vmp.gb28181.bean;
import java.util.Date;
import java.util.List;
import java.util.Map;
public class Device {
/**
* 数据库存储ID
*/
private int id;
/**
* 设备Id
*/
@@ -55,14 +61,24 @@ public class Device {
*/
private int online;
/**
* 通道列表
*/
// private Map<String,DeviceChannel> channelMap;
/**
* 注册时间
*/
private Long registerTimeMillis;
/**
* 通道个数
*/
private int channelCount;
private List<String> channelList;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
@@ -144,11 +160,11 @@ public class Device {
this.channelCount = channelCount;
}
public List<String> getChannelList() {
return channelList;
public Long getRegisterTimeMillis() {
return registerTimeMillis;
}
public void setChannelList(List<String> channelList) {
this.channelList = channelList;
public void setRegisterTimeMillis(Long registerTimeMillis) {
this.registerTimeMillis = registerTimeMillis;
}
}

View File

@@ -2,10 +2,17 @@ package com.genersoft.iot.vmp.gb28181.bean;
public class DeviceChannel {
/**
* 通道id
*/
private String channelId;
/**
* 设备id
*/
private String deviceId;
/**
* 通道名
@@ -146,13 +153,15 @@ public class DeviceChannel {
/**
* 是否含有音频
*/
private boolean hasAudio;
private boolean hasAudio;
/**
* 是否正在播放
*/
private boolean play;
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public void setPTZType(int PTZType) {
this.PTZType = PTZType;
@@ -387,14 +396,6 @@ public class DeviceChannel {
this.hasAudio = hasAudio;
}
public boolean isPlay() {
return play;
}
public void setPlay(boolean play) {
this.play = play;
}
public String getStreamId() {
return streamId;
}

View File

@@ -17,21 +17,34 @@ public class SipSubscribe {
private final static Logger logger = LoggerFactory.getLogger(SipSubscribe.class);
private Map<String, SipSubscribe.Event> allSubscribes = new ConcurrentHashMap<>();
private Map<String, SipSubscribe.Event> errorSubscribes = new ConcurrentHashMap<>();
private Map<String, SipSubscribe.Event> okSubscribes = new ConcurrentHashMap<>();
public interface Event {
void response(ResponseEvent event);
}
public void addSubscribe(String key, SipSubscribe.Event event) {
allSubscribes.put(key, event);
public void addErrorSubscribe(String key, SipSubscribe.Event event) {
errorSubscribes.put(key, event);
}
public SipSubscribe.Event getSubscribe(String key) {
return allSubscribes.get(key);
public void addOkSubscribe(String key, SipSubscribe.Event event) {
okSubscribes.put(key, event);
}
public int getSize(){
return allSubscribes.size();
public SipSubscribe.Event getErrorSubscribe(String key) {
return errorSubscribes.get(key);
}
public SipSubscribe.Event getOkSubscribe(String key) {
return okSubscribes.get(key);
}
public int getErrorSubscribesSize(){
return errorSubscribes.size();
}
public int getOkSubscribesSize(){
return okSubscribes.size();
}
}

View File

@@ -4,13 +4,10 @@ import javax.sip.RequestEvent;
import javax.sip.ResponseEvent;
import javax.sip.SipProvider;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.header.Header;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -59,6 +56,9 @@ public class SIPProcessorFactory {
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private EventPublisher publisher;
@@ -143,6 +143,7 @@ public class SIPProcessorFactory {
processor.setOffLineDetector(offLineDetector);
processor.setCmder(cmder);
processor.setStorager(storager);
processor.setRedisCatchStorage(redisCatchStorage);
return processor;
} else {
return new OtherRequestProcessor();

View File

@@ -25,6 +25,8 @@ public class DeferredResultHolder {
public static final String CALLBACK_CMD_PlAY = "CALLBACK_PLAY";
public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
private Map<String, DeferredResult> map = new ConcurrentHashMap<String, DeferredResult>();
public void put(String key, DeferredResult result) {

View File

@@ -101,8 +101,9 @@ public interface ISIPCommander {
*
* @param ssrc ssrc
*/
void streamByeCmd(String ssrc, SipSubscribe.Event okEvent);
void streamByeCmd(String ssrc);
/**
* 语音广播
*

View File

@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import java.text.ParseException;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -12,11 +13,13 @@ import javax.sip.header.ViaHeader;
import javax.sip.message.Request;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +56,9 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
@Qualifier(value="tcpSipProvider")
@@ -229,7 +235,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag");
transmitRequest(device, request, null);
transmitRequest(device, request);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
@@ -264,7 +270,7 @@ public class SIPCommander implements ISIPCommander {
ptzXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag");
transmitRequest(device, request, null);
transmitRequest(device, request);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
@@ -291,7 +297,7 @@ public class SIPCommander implements ISIPCommander {
streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
}
String streamMode = device.getStreamMode().toUpperCase();
MediaServerConfig mediaInfo = storager.getMediaInfo();
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
if (mediaInfo == null) {
logger.warn("点播时发现ZLM尚未连接...");
return;
@@ -344,6 +350,9 @@ public class SIPCommander implements ISIPCommander {
}
content.append("y="+ssrc+"\r\n");//ssrc
// String fromTag = UUID.randomUUID().toString();
// Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, fromTag, null, ssrc);
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "live", null, ssrc);
ClientTransaction transaction = transmitRequest(device, request, errorEvent);
@@ -372,7 +381,7 @@ public class SIPCommander implements ISIPCommander {
public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event
, SipSubscribe.Event errorEvent) {
try {
MediaServerConfig mediaInfo = storager.getMediaInfo();
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
String ssrc = streamSession.createPlayBackSsrc();
String streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
// 添加订阅
@@ -457,17 +466,28 @@ public class SIPCommander implements ISIPCommander {
e.printStackTrace();
}
}
/**
* 视频流停止
*
*/
@Override
public void streamByeCmd(String streamId) {
public void streamByeCmd(String ssrc) {
streamByeCmd(ssrc, null);
}
@Override
public void streamByeCmd(String streamId, SipSubscribe.Event okEvent) {
try {
ClientTransaction transaction = streamSession.get(streamId);
// 服务重启后
if (transaction == null) {
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
if (streamInfo != null) {
}
return;
}
@@ -475,6 +495,9 @@ public class SIPCommander implements ISIPCommander {
if (dialog == null) {
return;
}
Request byeRequest = dialog.createRequest(Request.BYE);
SipURI byeURI = (SipURI) byeRequest.getRequestURI();
String vh = transaction.getRequest().getHeader(ViaHeader.NAME).toString();
@@ -491,7 +514,14 @@ public class SIPCommander implements ISIPCommander {
} else if("UDP".equals(protocol)) {
clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
}
CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME);
if (okEvent != null) {
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
}
dialog.sendRequest(clientTransaction);
streamSession.remove(streamId);
zlmrtpServerFactory.closeRTPServer(streamId);
} catch (TransactionDoesNotExistException e) {
@@ -612,7 +642,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDeviceInfoTag", "ToDeviceInfoTag");
transmitRequest(device, request, null);
transmitRequest(device, request);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
@@ -676,7 +706,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "ViaRecordInfoBranch", "FromRecordInfoTag", null);
transmitRequest(device, request, null);
transmitRequest(device, request);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return false;
@@ -727,8 +757,16 @@ public class SIPCommander implements ISIPCommander {
// TODO Auto-generated method stub
return false;
}
private ClientTransaction transmitRequest(Device device, Request request) throws SipException {
return transmitRequest(device, request, null, null);
}
private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent) throws SipException {
return transmitRequest(device, request, errorEvent, null);
}
private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException {
ClientTransaction clientTransaction = null;
if("TCP".equals(device.getTransport())) {
clientTransaction = tcpSipProvider.getNewClientTransaction(request);
@@ -736,10 +774,14 @@ public class SIPCommander implements ISIPCommander {
clientTransaction = udpSipProvider.getNewClientTransaction(request);
}
// 添加订阅
CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
// 添加错误订阅
if (errorEvent != null) {
CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
sipSubscribe.addSubscribe(callIdHeader.getCallId(), errorEvent);
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), errorEvent);
}
// 添加订阅
if (okEvent != null) {
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
}
clientTransaction.sendRequest();
@@ -747,6 +789,8 @@ public class SIPCommander implements ISIPCommander {
}
@Override
public void closeRTPServer(Device device, String channelId) {
if (rtpEnable) {

View File

@@ -10,6 +10,7 @@ import javax.sip.SipException;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@@ -48,6 +49,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
private IVideoManagerStorager storager;
private IRedisCatchStorage redisCatchStorage;
private EventPublisher publisher;
private RedisUtil redis;
@@ -451,9 +454,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
String NotifyType =XmlUtil.getText(rootElement, "NotifyType");
if (NotifyType.equals("121")){
logger.info("媒体播放完毕,通知关流");
StreamInfo streamInfo = storager.queryPlaybackByDevice(deviceId, "*");
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*");
if (streamInfo != null) {
storager.stopPlayback(streamInfo);
redisCatchStorage.stopPlayback(streamInfo);
cmder.streamByeCmd(streamInfo.getStreamId());
}
}
@@ -507,4 +510,11 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
this.offLineDetector = offLineDetector;
}
public IRedisCatchStorage getRedisCatchStorage() {
return redisCatchStorage;
}
public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
this.redisCatchStorage = redisCatchStorage;
}
}

View File

@@ -141,9 +141,15 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
// 下发catelog查询目录
if (registerFlag == 1 && device != null) {
logger.info("注册成功! deviceId:" + device.getDeviceId());
boolean exists = storager.exists(device.getDeviceId());
device.setRegisterTimeMillis(System.currentTimeMillis());
storager.updateDevice(device);
publisher.onlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_ONLINE_REGISTER);
handler.onRegister(device);
// 只有第一次注册才更新通道
if (!exists) {
handler.onRegister(device);
}
} else if (registerFlag == 2) {
logger.info("注销成功! deviceId:" + device.getDeviceId());
publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER);