修复数据库语法兼容以及redis接收推流信息导入

This commit is contained in:
648540858
2024-09-14 15:19:07 +08:00
parent a36c427394
commit 800d6c926a
21 changed files with 120 additions and 274 deletions

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamPush.bean.RedisPushStreamMessage;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -24,10 +25,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* @Auther: JiangFeng
* @Date: 2022/8/16 11:32
* @Description: 接收redis发送的推流设备列表更新通知
* 监听 SUBSCRIBE VM_MSG_PUSH_STREAM_LIST_CHANGE
* 发布 PUBLISH VM_MSG_PUSH_STREAM_LIST_CHANGE '[{"app":1000,"stream":10000000,"gbId":"12345678901234567890","name":"A6","status":false},{"app":1000,"stream":10000021,"gbId":"24212345671381000021","name":"终端9273","status":false},{"app":1000,"stream":10000022,"gbId":"24212345671381000022","name":"终端9434","status":true},{"app":1000,"stream":10000025,"gbId":"24212345671381000025","name":"华为M10","status":false},{"app":1000,"stream":10000051,"gbId":"11111111111381111122","name":"终端9720","status":false}]'
*/
@Slf4j
@Component
public class RedisPushStreamStatusListMsgListener implements MessageListener {
public class RedisPushStreamListMsgListener implements MessageListener {
@Resource
private IMediaServerService mediaServerService;
@@ -51,7 +54,7 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
List<StreamPush> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPush.class);
List<RedisPushStreamMessage> streamPushItems = JSON.parseArray(new String(msg.getBody()), RedisPushStreamMessage.class);
//查询全部的app+stream 用于判断是添加还是修改
Map<String, StreamPush> allAppAndStream = streamPushService.getAllAppAndStreamMap();
Map<String, StreamPush> allGBId = streamPushService.getAllGBId();
@@ -61,31 +64,39 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
*/
List<StreamPush> streamPushItemForSave = new ArrayList<>();
List<StreamPush> streamPushItemForUpdate = new ArrayList<>();
for (StreamPush streamPush : streamPushItems) {
String app = streamPush.getApp();
String stream = streamPush.getStream();
for (RedisPushStreamMessage pushStreamMessage : streamPushItems) {
String app = pushStreamMessage.getApp();
String stream = pushStreamMessage.getStream();
boolean contains = allAppAndStream.containsKey(app + stream);
//不存在就添加
if (!contains) {
if (allGBId.containsKey(streamPush.getGbDeviceId())) {
StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId());
if (allGBId.containsKey(pushStreamMessage.getGbId())) {
StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId());
log.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}",
streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream());
continue;
}
StreamPush streamPush = pushStreamMessage.buildstreamPush();
streamPush.setCreateTime(DateUtil.getNow());
streamPush.setUpdateTime(DateUtil.getNow());
streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItemForSave.add(streamPush);
allGBId.put(streamPush.getGbDeviceId(), streamPush);
} else {
if (allGBId.containsKey(streamPush.getGbDeviceId())
&& (!allGBId.get(streamPush.getGbDeviceId()).getApp().equals(streamPush.getApp())
|| !allGBId.get(streamPush.getGbDeviceId()).getStream().equals(streamPush.getStream()))) {
StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId());
StreamPush streamPushForGbDeviceId = allGBId.get(pushStreamMessage.getGbId());
if (streamPushForGbDeviceId != null
&& (!streamPushForGbDeviceId.getApp().equals(pushStreamMessage.getApp())
|| !streamPushForGbDeviceId.getStream().equals(pushStreamMessage.getStream()))) {
StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId());
log.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
streamPush.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream());
pushStreamMessage.getGbId(), streamPushInDb.getApp(), streamPushInDb.getStream());
continue;
}
StreamPush streamPush = allAppAndStream.get(app + stream);
streamPush.setUpdateTime(DateUtil.getNow());
streamPush.setGbDeviceId(pushStreamMessage.getGbId());
streamPush.setGbName(pushStreamMessage.getName());
streamPush.setGbStatus(pushStreamMessage.isStatus()?"ON":"OFF");
//存在就只修改 name和gbId
streamPushItemForUpdate.add(streamPush);
}