临时提交
This commit is contained in:
@@ -1,11 +1,14 @@
|
||||
package com.genersoft.iot.vmp.service.bean;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 收到redis通知修改推流通道状态
|
||||
* @author lin
|
||||
*/
|
||||
@Data
|
||||
public class PushStreamStatusChangeFromRedisDto {
|
||||
|
||||
private boolean setAllOffline;
|
||||
@@ -13,29 +16,4 @@ public class PushStreamStatusChangeFromRedisDto {
|
||||
private List<StreamPushItemFromRedis> onlineStreams;
|
||||
|
||||
private List<StreamPushItemFromRedis> offlineStreams;
|
||||
|
||||
|
||||
public boolean isSetAllOffline() {
|
||||
return setAllOffline;
|
||||
}
|
||||
|
||||
public void setSetAllOffline(boolean setAllOffline) {
|
||||
this.setAllOffline = setAllOffline;
|
||||
}
|
||||
|
||||
public List<StreamPushItemFromRedis> getOnlineStreams() {
|
||||
return onlineStreams;
|
||||
}
|
||||
|
||||
public void setOnlineStreams(List<StreamPushItemFromRedis> onlineStreams) {
|
||||
this.onlineStreams = onlineStreams;
|
||||
}
|
||||
|
||||
public List<StreamPushItemFromRedis> getOfflineStreams() {
|
||||
return offlineStreams;
|
||||
}
|
||||
|
||||
public void setOfflineStreams(List<StreamPushItemFromRedis> offlineStreams) {
|
||||
this.offlineStreams = offlineStreams;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,10 @@ import java.text.ParseException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
/**
|
||||
* 监听 SUBSCRIBE alarm_receive
|
||||
* 发布 PUBLISH alarm_receive '{ "gbId": "", "alarmSn": 1, "alarmType": "111", "alarmDescription": "222", }'
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisAlarmMsgListener implements MessageListener {
|
||||
@@ -62,7 +66,6 @@ public class RedisAlarmMsgListener implements MessageListener {
|
||||
|
||||
@Override
|
||||
public void onMessage(@NotNull Message message, byte[] bytes) {
|
||||
// 消息示例: PUBLISH alarm_receive '{ "gbId": "", "alarmSn": 1, "alarmType": "111", "alarmDescription": "222", }'
|
||||
log.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
|
||||
boolean isEmpty = taskQueue.isEmpty();
|
||||
taskQueue.offer(message);
|
||||
|
||||
@@ -20,6 +20,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
/**
|
||||
* 接收来自redis的GPS更新通知
|
||||
* @author lin
|
||||
* 监听: SUBSCRIBE VM_MSG_GPS
|
||||
* 发布 PUBLISH VM_MSG_GPS ''
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
|
||||
@@ -40,9 +40,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
|
||||
|
||||
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
@@ -59,26 +57,26 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
while (!taskQueue.isEmpty()) {
|
||||
Message msg = taskQueue.poll();
|
||||
try {
|
||||
PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
|
||||
if (statusChangeFromPushStream == null) {
|
||||
PushStreamStatusChangeFromRedisDto streamStatusMessage = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
|
||||
if (streamStatusMessage == null) {
|
||||
log.warn("[REDIS消息]推流设备状态变化消息解析失败");
|
||||
continue;
|
||||
}
|
||||
// 取消定时任务
|
||||
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
|
||||
if (statusChangeFromPushStream.isSetAllOffline()) {
|
||||
if (streamStatusMessage.isSetAllOffline()) {
|
||||
// 所有设备离线
|
||||
streamPushService.allOffline();
|
||||
}
|
||||
if (statusChangeFromPushStream.getOfflineStreams() != null
|
||||
&& statusChangeFromPushStream.getOfflineStreams().size() > 0) {
|
||||
if (streamStatusMessage.getOfflineStreams() != null
|
||||
&& !streamStatusMessage.getOfflineStreams().isEmpty()) {
|
||||
// 更新部分设备离线
|
||||
streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
|
||||
streamPushService.offline(streamStatusMessage.getOfflineStreams());
|
||||
}
|
||||
if (statusChangeFromPushStream.getOnlineStreams() != null &&
|
||||
statusChangeFromPushStream.getOnlineStreams().size() > 0) {
|
||||
if (streamStatusMessage.getOnlineStreams() != null &&
|
||||
!streamStatusMessage.getOnlineStreams().isEmpty()) {
|
||||
// 更新部分设备上线
|
||||
streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
|
||||
streamPushService.online(streamStatusMessage.getOnlineStreams());
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
|
||||
Reference in New Issue
Block a user