优化日志以及属性设置代码
This commit is contained in:
@@ -108,4 +108,8 @@ public interface IDeviceChannelService {
|
||||
void offline(DeviceChannel channel);
|
||||
|
||||
void delete(DeviceChannel channel);
|
||||
|
||||
void cleanChannelsForDevice(String deviceId);
|
||||
|
||||
boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannels);
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
|
||||
import com.genersoft.iot.vmp.service.IDeviceChannelService;
|
||||
import com.genersoft.iot.vmp.service.IInviteStreamService;
|
||||
@@ -23,12 +24,13 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
@@ -426,4 +428,147 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
||||
// }
|
||||
deviceMobilePositionMapper.batchadd(mobilePositions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanChannelsForDevice(String deviceId) {
|
||||
channelMapper.cleanChannelsByDeviceId(deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
|
||||
if (CollectionUtils.isEmpty(deviceChannelList)) {
|
||||
return false;
|
||||
}
|
||||
List<DeviceChannel> allChannels = channelMapper.queryAllChannels(deviceId);
|
||||
Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
|
||||
if (allChannels.size() > 0) {
|
||||
for (DeviceChannel deviceChannel : allChannels) {
|
||||
allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
|
||||
}
|
||||
}
|
||||
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
|
||||
// 数据去重
|
||||
List<DeviceChannel> channels = new ArrayList<>();
|
||||
|
||||
List<DeviceChannel> updateChannels = new ArrayList<>();
|
||||
List<DeviceChannel> addChannels = new ArrayList<>();
|
||||
List<DeviceChannel> deleteChannels = new ArrayList<>();
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
Map<String, Integer> subContMap = new HashMap<>();
|
||||
|
||||
// 数据去重
|
||||
Set<String> gbIdSet = new HashSet<>();
|
||||
for (DeviceChannel deviceChannel : deviceChannelList) {
|
||||
if (gbIdSet.contains(deviceChannel.getChannelId())) {
|
||||
stringBuilder.append(deviceChannel.getChannelId()).append(",");
|
||||
continue;
|
||||
}
|
||||
gbIdSet.add(deviceChannel.getChannelId());
|
||||
if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
|
||||
deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
|
||||
deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).getHasAudio());
|
||||
if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
|
||||
List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
|
||||
if (!CollectionUtils.isEmpty(strings)){
|
||||
strings.forEach(platformId->{
|
||||
eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()? CatalogEvent.ON:CatalogEvent.OFF);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
deviceChannel.setUpdateTime(DateUtil.getNow());
|
||||
updateChannels.add(deviceChannel);
|
||||
}else {
|
||||
deviceChannel.setCreateTime(DateUtil.getNow());
|
||||
deviceChannel.setUpdateTime(DateUtil.getNow());
|
||||
addChannels.add(deviceChannel);
|
||||
}
|
||||
allChannelMap.remove(deviceChannel.getChannelId());
|
||||
channels.add(deviceChannel);
|
||||
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
|
||||
if (subContMap.get(deviceChannel.getParentId()) == null) {
|
||||
subContMap.put(deviceChannel.getParentId(), 1);
|
||||
}else {
|
||||
Integer count = subContMap.get(deviceChannel.getParentId());
|
||||
subContMap.put(deviceChannel.getParentId(), count++);
|
||||
}
|
||||
}
|
||||
}
|
||||
deleteChannels.addAll(allChannelMap.values());
|
||||
if (!channels.isEmpty()) {
|
||||
for (DeviceChannel channel : channels) {
|
||||
if (subContMap.get(channel.getChannelId()) != null){
|
||||
Integer count = subContMap.get(channel.getChannelId());
|
||||
if (count > 0) {
|
||||
channel.setSubCount(count);
|
||||
channel.setParental(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (stringBuilder.length() > 0) {
|
||||
logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
|
||||
}
|
||||
if(CollectionUtils.isEmpty(channels)){
|
||||
logger.info("通道重设,数据为空={}" , deviceChannelList);
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
int limitCount = 50;
|
||||
boolean result = false;
|
||||
if (!result && !addChannels.isEmpty()) {
|
||||
if (addChannels.size() > limitCount) {
|
||||
for (int i = 0; i < addChannels.size(); i += limitCount) {
|
||||
int toIndex = i + limitCount;
|
||||
if (i + limitCount > addChannels.size()) {
|
||||
toIndex = addChannels.size();
|
||||
}
|
||||
result = result || channelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
|
||||
}
|
||||
}else {
|
||||
result = result || channelMapper.batchAdd(addChannels) < 0;
|
||||
}
|
||||
}
|
||||
if (!result && !updateChannels.isEmpty()) {
|
||||
if (updateChannels.size() > limitCount) {
|
||||
for (int i = 0; i < updateChannels.size(); i += limitCount) {
|
||||
int toIndex = i + limitCount;
|
||||
if (i + limitCount > updateChannels.size()) {
|
||||
toIndex = updateChannels.size();
|
||||
}
|
||||
result = result || channelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
|
||||
}
|
||||
}else {
|
||||
result = result || channelMapper.batchUpdate(updateChannels) < 0;
|
||||
}
|
||||
}
|
||||
if (!result && !deleteChannels.isEmpty()) {
|
||||
System.out.println("删除: " + deleteChannels.size());
|
||||
if (deleteChannels.size() > limitCount) {
|
||||
for (int i = 0; i < deleteChannels.size(); i += limitCount) {
|
||||
int toIndex = i + limitCount;
|
||||
if (i + limitCount > deleteChannels.size()) {
|
||||
toIndex = deleteChannels.size();
|
||||
}
|
||||
result = result || channelMapper.batchDel(deleteChannels.subList(i, toIndex)) < 0;
|
||||
}
|
||||
}else {
|
||||
result = result || channelMapper.batchDel(deleteChannels) < 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (result) {
|
||||
//事务回滚
|
||||
dataSourceTransactionManager.rollback(transactionStatus);
|
||||
}
|
||||
dataSourceTransactionManager.commit(transactionStatus); //手动提交
|
||||
return true;
|
||||
}catch (Exception e) {
|
||||
logger.error("未处理的异常 ", e);
|
||||
dataSourceTransactionManager.rollback(transactionStatus);
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,8 +124,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
|
||||
return allCount;
|
||||
}
|
||||
|
||||
private List<DeviceChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId, ParentPlatform platform) {
|
||||
List<DeviceChannel> deviceChannelList = new ArrayList<>();
|
||||
private List<CommonGBChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId, ParentPlatform platform) {
|
||||
List<CommonGBChannel> deviceChannelList = new ArrayList<>();
|
||||
if (channelReduces.size() > 0){
|
||||
PlatformCatalog catalog = catalogManager.selectByPlatFormAndCatalogId(platform.getServerGBId(),catalogId);
|
||||
if (catalog == null && catalogId.equals(platform.getDeviceGBId())) {
|
||||
|
||||
@@ -2,13 +2,15 @@ package com.genersoft.iot.vmp.service.redisMsg;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
@@ -24,12 +26,10 @@ import java.text.ParseException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisAlarmMsgListener implements MessageListener {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(RedisAlarmMsgListener.class);
|
||||
|
||||
@Autowired
|
||||
private ISIPCommander commander;
|
||||
|
||||
@@ -51,7 +51,7 @@ public class RedisAlarmMsgListener implements MessageListener {
|
||||
@Override
|
||||
public void onMessage(@NotNull Message message, byte[] bytes) {
|
||||
// 消息示例: PUBLISH alarm_receive '{ "gbId": "", "alarmSn": 1, "alarmType": "111", "alarmDescription": "222", }'
|
||||
logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
|
||||
log.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
|
||||
boolean isEmpty = taskQueue.isEmpty();
|
||||
taskQueue.offer(message);
|
||||
if (isEmpty) {
|
||||
@@ -62,7 +62,7 @@ public class RedisAlarmMsgListener implements MessageListener {
|
||||
try {
|
||||
AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
|
||||
if (alarmChannelMessage == null) {
|
||||
logger.warn("[REDIS的ALARM通知]消息解析失败");
|
||||
log.warn("[REDIS的ALARM通知]消息解析失败");
|
||||
continue;
|
||||
}
|
||||
String gbId = alarmChannelMessage.getGbId();
|
||||
@@ -88,7 +88,7 @@ public class RedisAlarmMsgListener implements MessageListener {
|
||||
deviceAlarm.setChannelId(parentPlatform.getDeviceGBId());
|
||||
commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
|
||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||
logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
|
||||
log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -101,7 +101,7 @@ public class RedisAlarmMsgListener implements MessageListener {
|
||||
deviceAlarm.setChannelId(parentPlatform.getDeviceGBId());
|
||||
commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
|
||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||
logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
|
||||
log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -115,7 +115,7 @@ public class RedisAlarmMsgListener implements MessageListener {
|
||||
deviceAlarm.setChannelId(device.getDeviceId());
|
||||
commander.sendAlarmMessage(device, deviceAlarm);
|
||||
} catch (InvalidArgumentException | SipException | ParseException e) {
|
||||
logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
|
||||
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,21 +127,21 @@ public class RedisAlarmMsgListener implements MessageListener {
|
||||
try {
|
||||
commander.sendAlarmMessage(device, deviceAlarm);
|
||||
} catch (InvalidArgumentException | SipException | ParseException e) {
|
||||
logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
|
||||
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
|
||||
}
|
||||
}else if (device == null && platform != null){
|
||||
try {
|
||||
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
|
||||
} catch (InvalidArgumentException | SipException | ParseException e) {
|
||||
logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
|
||||
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
|
||||
}
|
||||
}else {
|
||||
logger.warn("无法确定" + gbId + "是平台还是设备");
|
||||
log.warn("无法确定" + gbId + "是平台还是设备");
|
||||
}
|
||||
}
|
||||
}catch (Exception e) {
|
||||
logger.error("未处理的异常 ", e);
|
||||
logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
|
||||
log.error("未处理的异常 ", e);
|
||||
log.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -4,9 +4,8 @@ import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
@@ -22,11 +21,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
* 接收来自redis的GPS更新通知
|
||||
* @author lin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisGpsMsgListener implements MessageListener {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@@ -50,12 +48,12 @@ public class RedisGpsMsgListener implements MessageListener {
|
||||
Message msg = taskQueue.poll();
|
||||
try {
|
||||
GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
|
||||
logger.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
|
||||
log.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
|
||||
// 只是放入redis缓存起来
|
||||
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
|
||||
}catch (Exception e) {
|
||||
logger.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
logger.error("[REDIS的位置变化通知] 异常内容: ", e);
|
||||
log.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
log.error("[REDIS的位置变化通知] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -2,8 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
@@ -20,11 +19,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
* 接收redis返回的推流结果
|
||||
* @author lin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisPushStreamResponseListener implements MessageListener {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
|
||||
|
||||
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@@ -40,7 +38,7 @@ public class RedisPushStreamResponseListener implements MessageListener {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] bytes) {
|
||||
logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
|
||||
log.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
|
||||
boolean isEmpty = taskQueue.isEmpty();
|
||||
taskQueue.offer(message);
|
||||
if (isEmpty) {
|
||||
@@ -50,7 +48,7 @@ public class RedisPushStreamResponseListener implements MessageListener {
|
||||
try {
|
||||
MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
|
||||
if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
|
||||
logger.info("[REDIS消息-请求推流结果]:参数不全");
|
||||
log.info("[REDIS消息-请求推流结果]:参数不全");
|
||||
continue;
|
||||
}
|
||||
// 查看正在等待的invite消息
|
||||
@@ -58,8 +56,8 @@ public class RedisPushStreamResponseListener implements MessageListener {
|
||||
responseEvents.get(response.getApp() + response.getStream()).run(response);
|
||||
}
|
||||
}catch (Exception e) {
|
||||
logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
|
||||
log.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
log.error("[REDIS消息-请求推流结果] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -3,12 +3,11 @@ package com.genersoft.iot.vmp.service.redisMsg;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
|
||||
import com.genersoft.iot.vmp.service.IGbStreamService;
|
||||
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
|
||||
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
@@ -27,10 +26,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
* @Date: 2022/8/16 11:32
|
||||
* @Description: 接收redis发送的推流设备列表更新通知
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisPushStreamStatusListMsgListener implements MessageListener {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class);
|
||||
@Resource
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
@@ -48,7 +47,7 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] bytes) {
|
||||
logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
|
||||
log.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
|
||||
boolean isEmpty = taskQueue.isEmpty();
|
||||
taskQueue.offer(message);
|
||||
if (isEmpty) {
|
||||
@@ -74,7 +73,7 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
|
||||
if (!contains) {
|
||||
if (allGBId.containsKey(streamPush.getGbDeviceId())) {
|
||||
StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId());
|
||||
logger.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}",
|
||||
log.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}",
|
||||
streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream());
|
||||
continue;
|
||||
}
|
||||
@@ -87,7 +86,7 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
|
||||
&& (!allGBId.get(streamPush.getGbDeviceId()).getApp().equals(streamPush.getApp())
|
||||
|| !allGBId.get(streamPush.getGbDeviceId()).getStream().equals(streamPush.getStream()))) {
|
||||
StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId());
|
||||
logger.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
|
||||
log.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
|
||||
streamPush.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream());
|
||||
continue;
|
||||
}
|
||||
@@ -96,19 +95,19 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
|
||||
}
|
||||
}
|
||||
if (!streamPushItemForSave.isEmpty()) {
|
||||
logger.info("添加{}条",streamPushItemForSave.size());
|
||||
logger.info(JSONObject.toJSONString(streamPushItemForSave));
|
||||
log.info("添加{}条",streamPushItemForSave.size());
|
||||
log.info(JSONObject.toJSONString(streamPushItemForSave));
|
||||
streamPushService.batchAdd(streamPushItemForSave);
|
||||
|
||||
}
|
||||
if(!streamPushItemForUpdate.isEmpty()){
|
||||
logger.info("修改{}条",streamPushItemForUpdate.size());
|
||||
logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
|
||||
log.info("修改{}条",streamPushItemForUpdate.size());
|
||||
log.info(JSONObject.toJSONString(streamPushItemForUpdate));
|
||||
gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
|
||||
}
|
||||
}catch (Exception e) {
|
||||
logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody()));
|
||||
logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e);
|
||||
log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody()));
|
||||
log.error("[REDIS消息-推流设备列表更新] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -4,11 +4,10 @@ import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
|
||||
import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
@@ -25,11 +24,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
* 接收redis发送的推流设备上线下线通知
|
||||
* @author lin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisPushStreamStatusMsgListener implements MessageListener, ApplicationRunner {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@@ -53,7 +51,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] bytes) {
|
||||
boolean isEmpty = taskQueue.isEmpty();
|
||||
logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody()));
|
||||
log.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody()));
|
||||
taskQueue.offer(message);
|
||||
|
||||
if (isEmpty) {
|
||||
@@ -63,7 +61,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
try {
|
||||
PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
|
||||
if (statusChangeFromPushStream == null) {
|
||||
logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
|
||||
log.warn("[REDIS消息]推流设备状态变化消息解析失败");
|
||||
continue;
|
||||
}
|
||||
// 取消定时任务
|
||||
@@ -83,8 +81,8 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
|
||||
}
|
||||
}catch (Exception e) {
|
||||
logger.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
logger.error("[REDIS消息-推流设备状态变化] 异常内容: ", e);
|
||||
log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||
log.error("[REDIS消息-推流设备状态变化] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -97,7 +95,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
// 启动时设置所有推流通道离线,发起查询请求
|
||||
redisCatchStorage.sendStreamPushRequestedMsgForStatus();
|
||||
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{
|
||||
logger.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线");
|
||||
log.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线");
|
||||
// 五秒收不到请求就设置通道离线,然后通知上级离线
|
||||
streamPushService.allOffline();
|
||||
}, 5000);
|
||||
|
||||
@@ -22,8 +22,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -35,11 +34,10 @@ import java.text.ParseException;
|
||||
/**
|
||||
* 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisRpcController {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
|
||||
|
||||
@Autowired
|
||||
private SSRCFactory ssrcFactory;
|
||||
|
||||
@@ -77,12 +75,12 @@ public class RedisRpcController {
|
||||
String sendRtpItemKey = request.getParam().toString();
|
||||
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
|
||||
if (sendRtpItem == null) {
|
||||
logger.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setStatusCode(200);
|
||||
return response;
|
||||
}
|
||||
logger.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
// 查询本级是否有这个流
|
||||
MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
|
||||
if (mediaServerItem == null) {
|
||||
@@ -92,7 +90,7 @@ public class RedisRpcController {
|
||||
// 自平台内容
|
||||
int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
|
||||
if (localPort == 0) {
|
||||
logger.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
|
||||
log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setStatusCode(200);
|
||||
}
|
||||
@@ -118,11 +116,11 @@ public class RedisRpcController {
|
||||
*/
|
||||
public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
|
||||
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
|
||||
logger.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
log.info("[redis-rpc] 监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
// 查询本级是否有这个流
|
||||
MediaServer mediaServer = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
|
||||
if (mediaServer != null) {
|
||||
logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
// 读取redis中的上级点播信息,生成sendRtpItm发送出去
|
||||
if (sendRtpItem.getSsrc() == null) {
|
||||
// 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
|
||||
@@ -141,7 +139,7 @@ public class RedisRpcController {
|
||||
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
|
||||
hookSubscribe.addSubscribe(hook, (hookData) -> {
|
||||
logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
log.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
// 读取redis中的上级点播信息,生成sendRtpItm发送出去
|
||||
if (sendRtpItem.getSsrc() == null) {
|
||||
// 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
|
||||
@@ -169,7 +167,7 @@ public class RedisRpcController {
|
||||
*/
|
||||
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
|
||||
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
|
||||
logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
log.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
|
||||
hookSubscribe.removeSubscribe(hook);
|
||||
@@ -188,22 +186,22 @@ public class RedisRpcController {
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setStatusCode(200);
|
||||
if (sendRtpItem == null) {
|
||||
logger.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
|
||||
response.setBody(wvpResult);
|
||||
return response;
|
||||
}
|
||||
logger.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
log.info("[redis-rpc] 开始发流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||
if (mediaServer == null) {
|
||||
logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
|
||||
log.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
|
||||
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
|
||||
response.setBody(wvpResult);
|
||||
return response;
|
||||
}
|
||||
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
|
||||
if (mediaInfo != null) {
|
||||
logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
|
||||
log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
|
||||
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线");
|
||||
response.setBody(wvpResult);
|
||||
return response;
|
||||
@@ -211,12 +209,12 @@ public class RedisRpcController {
|
||||
try {
|
||||
mediaServerService.startSendRtp(mediaServer, sendRtpItem);
|
||||
}catch (ControllerException exception) {
|
||||
logger.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
|
||||
log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg());
|
||||
WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg());
|
||||
response.setBody(wvpResult);
|
||||
return response;
|
||||
}
|
||||
logger.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
log.info("[redis-rpc] 发流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
WVPResult wvpResult = WVPResult.success();
|
||||
response.setBody(wvpResult);
|
||||
return response;
|
||||
@@ -231,15 +229,15 @@ public class RedisRpcController {
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setStatusCode(200);
|
||||
if (sendRtpItem == null) {
|
||||
logger.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
|
||||
response.setBody(wvpResult);
|
||||
return response;
|
||||
}
|
||||
logger.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
log.info("[redis-rpc] 停止推流: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||
if (mediaServer == null) {
|
||||
logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
|
||||
log.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
|
||||
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer");
|
||||
response.setBody(wvpResult);
|
||||
return response;
|
||||
@@ -247,12 +245,12 @@ public class RedisRpcController {
|
||||
try {
|
||||
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
|
||||
}catch (ControllerException exception) {
|
||||
logger.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(),
|
||||
log.info("[redis-rpc] 停止推流失败: {}/{}, 目标地址: {}:{}, code: {}, msg: {}", sendRtpItem.getApp(),
|
||||
sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getCode(), exception.getMsg() );
|
||||
response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg()));
|
||||
return response;
|
||||
}
|
||||
logger.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
response.setBody(WVPResult.success());
|
||||
return response;
|
||||
}
|
||||
@@ -266,10 +264,10 @@ public class RedisRpcController {
|
||||
RedisRpcResponse response = request.getResponse();
|
||||
response.setStatusCode(200);
|
||||
if (sendRtpItem == null) {
|
||||
logger.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
log.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||
return response;
|
||||
}
|
||||
logger.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
log.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||
String platformId = sendRtpItem.getPlatformId();
|
||||
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
|
||||
if (platform == null) {
|
||||
@@ -281,13 +279,13 @@ public class RedisRpcController {
|
||||
sendRtpItem.getCallId(), sendRtpItem.getStream());
|
||||
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
|
||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
|
||||
log.error("[命令发送失败] 发送BYE: {}", e.getMessage());
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private void sendResponse(RedisRpcResponse response){
|
||||
logger.info("[redis-rpc] >> {}", response);
|
||||
log.info("[redis-rpc] >> {}", response);
|
||||
response.setToId(userSetting.getServerId());
|
||||
RedisRpcMessage message = new RedisRpcMessage();
|
||||
message.setResponse(response);
|
||||
|
||||
Reference in New Issue
Block a user