对需要向设备发起请求的http请求,使用缓存,等待设备请求返回的时候一次性释放所有请求

This commit is contained in:
648540858
2021-11-02 10:40:29 +08:00
parent ce931ab8a4
commit 3b21f385cd
17 changed files with 436 additions and 210 deletions

View File

@@ -13,7 +13,9 @@ import java.util.List;
public class RecordInfo {
private String deviceId;
private String channelId;
private String name;
private int sumNum;
@@ -52,4 +54,11 @@ public class RecordInfo {
this.recordList = recordList;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
}

View File

@@ -54,11 +54,9 @@ public class CheckForAllRecordsThread extends Thread {
// 自然顺序排序, 元素进行升序排列
this.recordInfo.getRecordList().sort(Comparator.naturalOrder());
RequestMessage msg = new RequestMessage();
String deviceId = recordInfo.getDeviceId();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
msg.setKey(DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getChannelId());
msg.setData(recordInfo);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
logger.info("处理完成,返回结果");
MessageRequestProcessor.threadNameList.remove(cacheKey);
}

View File

@@ -1,6 +1,8 @@
package com.genersoft.iot.vmp.gb28181.transmit.callback;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.http.HttpStatus;
@@ -45,22 +47,72 @@ public class DeferredResultHolder {
public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
private Map<String, DeferredResult> map = new ConcurrentHashMap<String, DeferredResult>();
private Map<String, Map<String, DeferredResult>> map = new ConcurrentHashMap<>();
public void put(String key, DeferredResult result) {
map.put(key, result);
public void put(String key, String id, DeferredResult result) {
Map<String, DeferredResult> deferredResultMap = map.get(key);
if (deferredResultMap == null) {
deferredResultMap = new ConcurrentHashMap<>();
map.put(key, deferredResultMap);
}
deferredResultMap.put(id, result);
}
public DeferredResult get(String key) {
return map.get(key);
public DeferredResult get(String key, String id) {
Map<String, DeferredResult> deferredResultMap = map.get(key);
if (deferredResultMap == null) return null;
return deferredResultMap.get(id);
}
public boolean exist(String key, String id){
if (key == null) return false;
Map<String, DeferredResult> deferredResultMap = map.get(key);
if (id == null) {
return deferredResultMap != null;
}else {
return deferredResultMap != null && deferredResultMap.get(id) != null;
}
}
/**
* 释放单个请求
* @param msg
*/
public void invokeResult(RequestMessage msg) {
DeferredResult result = map.get(msg.getId());
Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
if (deferredResultMap == null) {
return;
}
DeferredResult result = deferredResultMap.get(msg.getId());
if (result == null) {
return;
}
result.setResult(new ResponseEntity<>(msg.getData(),HttpStatus.OK));
deferredResultMap.remove(msg.getId());
if (deferredResultMap.size() == 0) {
map.remove(msg.getKey());
}
}
/**
* 释放所有的请求
* @param msg
*/
public void invokeAllResult(RequestMessage msg) {
Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
if (deferredResultMap == null) {
return;
}
Set<String> ids = deferredResultMap.keySet();
for (String id : ids) {
DeferredResult result = deferredResultMap.get(id);
if (result == null) {
return;
}
result.setResult(new ResponseEntity<>(msg.getData(),HttpStatus.OK));
}
map.remove(msg.getKey());
}
}

View File

@@ -9,12 +9,10 @@ public class RequestMessage {
private String id;
private String deviceId;
private String type;
private String key;
private Object data;
public String getId() {
return id;
}
@@ -23,22 +21,12 @@ public class RequestMessage {
this.id = id;
}
public String getDeviceId() {
return deviceId;
public void setKey(String key) {
this.key = key;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
this.id = type + deviceId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
this.id = type + deviceId;
public String getKey() {
return key;
}
public Object getData() {

View File

@@ -173,12 +173,12 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理MobilePosition移动位置消息时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt, device.getCharset());
MobilePosition mobilePosition = new MobilePosition();
Element deviceIdElement = rootElement.element("DeviceID");
if (!StringUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName());
}
@@ -227,11 +227,17 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageDeviceStatus(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceStatus设备状态Message时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String name = rootElement.getName();
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText();
Device device = storager.queryVideoDevice(deviceId);
String channelId = deviceIdElement.getText();
if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应还是Query——查询请求
logger.info("接收到DeviceStatus查询消息");
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
@@ -258,10 +264,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS);
msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + deviceId + channelId);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
@@ -282,8 +287,15 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageDeviceControl(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceControl设备状态Message未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
String channelId = XmlUtil.getText(rootElement, "DeviceID");
//String result = XmlUtil.getText(rootElement, "Result");
// 回复200 OK
responseAck(evt);
@@ -295,10 +307,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL);
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + channelId;
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
} else {
// 此处是上级发出的DeviceControl指令
String platformId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
@@ -344,8 +356,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 云台/前端控制命令
if (!StringUtils.isEmpty(XmlUtil.getText(rootElement,"PTZCmd")) && !deviceId.equals(targetGBId)) {
String cmdString = XmlUtil.getText(rootElement,"PTZCmd");
Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId);
cmder.fronEndCmd(device, deviceId, cmdString);
Device deviceForPlatform = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId);
cmder.fronEndCmd(deviceForPlatform, deviceId, cmdString);
}
}
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
@@ -360,8 +372,16 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageDeviceConfig(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceConfig设备状态Message消息时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
String channelId = XmlUtil.getText(rootElement, "DeviceID");
// 回复200 OK
responseAck(evt);
if (rootElement.getName().equals("Response")) {
@@ -371,11 +391,11 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + deviceId + channelId;
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG);
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
} else {
// 此处是上级发出的DeviceConfig指令
}
@@ -391,8 +411,17 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageConfigDownload(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理ConfigDownload设备状态Message时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
String channelId = XmlUtil.getText(rootElement, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + deviceId + channelId;
// 回复200 OK
responseAck(evt);
if (rootElement.getName().equals("Response")) {
@@ -403,10 +432,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD);
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
} else {
// 此处是上级发出的DeviceConfig指令
}
@@ -422,8 +450,17 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessagePresetQuery(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理PresetQuery预置位列表Message时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
String channelId = XmlUtil.getText(rootElement, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId;
// 回复200 OK
responseAck(evt);
if (rootElement.getName().equals("Response")) {// !StringUtils.isEmpty(result)) {
@@ -434,10 +471,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_PRESETQUERY);
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
} else {
// 此处是上级发出的DeviceControl指令
}
@@ -453,11 +489,19 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageDeviceInfo(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String requestName = rootElement.getName();
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getTextTrim();
Device device = storager.queryVideoDevice(deviceId);
String channelId = deviceIdElement.getTextTrim();
String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + deviceId + channelId;
if (device != null ) {
rootElement = getRootElement(evt, device.getCharset());
}
@@ -492,10 +536,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
storager.updateDevice(device);
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICEINFO);
msg.setKey(key);
msg.setData(device);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
// 回复200 OK
responseAck(evt);
if (offLineDetector.isOnline(deviceId)) {
@@ -514,12 +557,22 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageCatalogList(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String name = rootElement.getName();
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText();
String channelId = deviceIdElement.getText();
Element deviceListElement = rootElement.element("DeviceList");
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
@@ -581,10 +634,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
} else {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
}
deviceListElement = getRootElement(evt, device.getCharset()).element("DeviceList");
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
@@ -674,10 +723,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG);
msg.setKey(key);
msg.setData(device);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
// 回复200 OK
responseAck(evt);
if (offLineDetector.isOnline(deviceId)) {
@@ -701,11 +749,13 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理alarm设备报警信息未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt, device.getCharset());
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getText().toString();
String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId + channelId;
// 回复200 OK
responseAck(evt);
@@ -770,10 +820,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_ALARM);
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
@@ -787,10 +836,14 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageKeepAlive(RequestEvent evt) {
try {
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
Element rootElement = getRootElement(evt);
String channelId = XmlUtil.getText(rootElement, "DeviceID");
// 检查设备是否存在并在线, 不在线则设置为在线
if (device != null ) {
// 回复200 OK
@@ -831,18 +884,29 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageRecordInfo(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
response404Ack(evt);
return;
}
// 回复200 OK
responseAck(evt);
String uuid = UUID.randomUUID().toString().replace("-", "");
RecordInfo recordInfo = new RecordInfo();
Element rootElement = getRootElement(evt);
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText().toString();
Device device = storager.queryVideoDevice(deviceId);
String channelId = deviceIdElement.getText().toString();
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + channelId;
if (device != null ) {
rootElement = getRootElement(evt, device.getCharset());
}
recordInfo.setDeviceId(deviceId);
recordInfo.setChannelId(channelId);
recordInfo.setName(XmlUtil.getText(rootElement, "Name"));
if (XmlUtil.getText(rootElement, "SumNum")== null || XmlUtil.getText(rootElement, "SumNum") =="") {
recordInfo.setSumNum(0);
@@ -854,10 +918,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
if (recordListElement == null || recordInfo.getSumNum() == 0) {
logger.info("无录像数据");
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
msg.setKey(key);
msg.setData(recordInfo);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
List<RecordItem> recordList = new ArrayList<RecordItem>();
@@ -956,10 +1019,20 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageMediaStatus(RequestEvent evt){
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
response404Ack(evt);
return;
}
// 回复200 OK
responseAck(evt);
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
String channelId = XmlUtil.getText(rootElement, "DeviceID");
String NotifyType =XmlUtil.getText(rootElement, "NotifyType");
if (NotifyType.equals("121")){
logger.info("媒体播放完毕,通知关流");
@@ -981,8 +1054,19 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
*/
private void processMessageBroadcast(RequestEvent evt) {
try {
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
logger.warn("处理DeviceInfo设备信息Message时未找到设备信息");
response404Ack(evt);
return;
}
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
String channelId = XmlUtil.getText(rootElement, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId + channelId;
// 回复200 OK
responseAck(evt);
if (rootElement.getName().equals("Response")) {
@@ -993,10 +1077,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_BROADCAST);
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
deferredResultHolder.invokeAllResult(msg);
} else {
// 此处是上级发出的Broadcast指令
}