Merge branch '648540858:wvp-28181-2.0' into wvp-28181-2.0

This commit is contained in:
hotcoffie
2022-05-17 11:53:42 +08:00
committed by GitHub
79 changed files with 1163 additions and 1209 deletions

View File

@@ -3,56 +3,111 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import java.util.List;
/**
* 设备相关业务处理
* @author lin
*/
public interface IDeviceService {
/**
* 设备上线
* @param device 设备信息
*/
void online(Device device);
/**
* 设备下线
* @param deviceId 设备编号
*/
void offline(String deviceId);
/**
* 添加目录订阅
* @param device 设备信息
* @return
* @return 布尔
*/
boolean addCatalogSubscribe(Device device);
/**
* 移除目录订阅
* @param device 设备信息
* @return
* @return 布尔
*/
boolean removeCatalogSubscribe(Device device);
/**
* 添加移动位置订阅
* @param device 设备信息
* @return
* @return 布尔
*/
boolean addMobilePositionSubscribe(Device device);
/**
* 移除移动位置订阅
* @param device 设备信息
* @return
* @return 布尔
*/
boolean removeMobilePositionSubscribe(Device device);
/**
* 移除移动位置订阅
* @param deviceId 设备ID
* @return
* @return 同步状态
*/
SyncStatus getChannelSyncStatus(String deviceId);
/**
* 查看是否仍在同步
* @param deviceId 设备ID
* @return
* @return 布尔
*/
Boolean isSyncRunning(String deviceId);
/**
* 通道同步
* @param device
* @param device 设备信息
*/
void sync(Device device);
/**
* 查询设备信息
* @param deviceId 设备编号
* @return 设备信息
*/
Device queryDevice(String deviceId);
/**
* 获取所有在线设备
* @return 设备列表
*/
List<Device> getAllOnlineDevice();
/**
* 判断是否注册已经失效
* @param device 设备信息
* @return 布尔
*/
boolean expire(Device device);
/**
* 检查设备状态
* @param device 设备信息
*/
void checkDeviceStatus(Device device);
/**
* 根据IP和端口获取设备信息
* @param host IP
* @param port 端口
* @return 设备信息
*/
Device getDeviceByHostAndPort(String host, int port);
/**
* 更新设备
* @param device 设备信息
*/
void updateDevice(Device device);
}

View File

@@ -44,7 +44,7 @@ public interface IMediaServerService {
void updateVmServer(List<MediaServerItem> mediaServerItemList);
SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck);
SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck, boolean isPlayback);
SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback);

View File

@@ -2,21 +2,27 @@ package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.sip.DialogState;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 设备业务(目录订阅)
@@ -26,6 +32,8 @@ public class DeviceServiceImpl implements IDeviceService {
private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
private final String registerExpireTaskKeyPrefix = "device-register-expire-";
@Autowired
private DynamicTask dynamicTask;
@@ -38,6 +46,83 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private ISIPCommander commander;
@Autowired
private VideoStreamSessionManager streamSession;
@Autowired
private IMediaServerService mediaServerService;
@Override
public void online(Device device) {
logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
String now = DateUtil.getNow();
if (deviceInRedis != null && deviceInDb == null) {
// redis 存在脏数据
redisCatchStorage.clearCatchByDeviceId(device.getDeviceId());
}
device.setUpdateTime(now);
device.setOnline(1);
// 第一次上线
if (device.getCreateTime() == null) {
device.setCreateTime(now);
logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
commander.deviceInfoQuery(device);
sync(device);
deviceMapper.add(device);
}else {
deviceMapper.update(device);
}
redisCatchStorage.updateDevice(device);
// 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0) {
// 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
addCatalogSubscribe(device);
}
if (device.getSubscribeCycleForMobilePosition() > 0) {
addMobilePositionSubscribe(device);
}
// 刷新过期任务
String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId();
dynamicTask.stop(registerExpireTaskKey);
dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000);
}
@Override
public void offline(String deviceId) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
return;
}
String registerExpireTaskKey = registerExpireTaskKeyPrefix + deviceId;
dynamicTask.stop(registerExpireTaskKey);
device.setOnline(0);
redisCatchStorage.updateDevice(device);
deviceMapper.update(device);
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null);
if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
}
}
// 移除订阅
removeCatalogSubscribe(device);
removeMobilePositionSubscribe(device);
}
@Override
public boolean addCatalogSubscribe(Device device) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
@@ -49,7 +134,7 @@ public class DeviceServiceImpl implements IDeviceService {
// 提前开始刷新订阅
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
// 设置最小值为30
dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog -1);
dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000);
return true;
}
@@ -74,7 +159,7 @@ public class DeviceServiceImpl implements IDeviceService {
// 设置最小值为30
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
// 提前开始刷新订阅
dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog -1 );
dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, (subscribeCycleForCatalog -1 ) * 1000);
return true;
}
@@ -111,4 +196,92 @@ public class DeviceServiceImpl implements IDeviceService {
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
});
}
@Override
public Device queryDevice(String deviceId) {
return deviceMapper.getDeviceByDeviceId(deviceId);
}
@Override
public List<Device> getAllOnlineDevice() {
return deviceMapper.getOnlineDevices();
}
@Override
public boolean expire(Device device) {
Instant registerTimeDate = Instant.from(DateUtil.formatter.parse(device.getRegisterTime()));
Instant expireInstant = registerTimeDate.plusMillis(TimeUnit.SECONDS.toMillis(device.getExpires()));
return expireInstant.isBefore(Instant.now());
}
@Override
public void checkDeviceStatus(Device device) {
if (device == null || device.getOnline() == 0) {
return;
}
sipCommander.deviceStatusQuery(device, null);
}
@Override
public Device getDeviceByHostAndPort(String host, int port) {
return deviceMapper.getDeviceByHostAndPort(host, port);
}
@Override
public void updateDevice(Device device) {
Device deviceInStore = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
if (deviceInStore == null) {
logger.warn("更新设备时未找到设备信息");
return;
}
if (!StringUtils.isEmpty(device.getName())) {
deviceInStore.setName(device.getName());
}
if (!StringUtils.isEmpty(device.getCharset())) {
deviceInStore.setCharset(device.getCharset());
}
if (!StringUtils.isEmpty(device.getMediaServerId())) {
deviceInStore.setMediaServerId(device.getMediaServerId());
}
// 目录订阅相关的信息
if (device.getSubscribeCycleForCatalog() > 0) {
if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
// 开启订阅
addCatalogSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForCatalog() == 0) {
if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
// 取消订阅
removeCatalogSubscribe(deviceInStore);
}
}
// 移动位置订阅相关的信息
if (device.getSubscribeCycleForMobilePosition() > 0) {
if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
// 开启订阅
addMobilePositionSubscribe(deviceInStore);
}
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
// 取消订阅
removeMobilePositionSubscribe(deviceInStore);
}
}
String now = DateUtil.getNow();
device.setUpdateTime(now);
device.setCharset(device.getCharset().toUpperCase());
device.setUpdateTime(DateUtil.getNow());
if (deviceMapper.update(device) > 0) {
redisCatchStorage.updateDevice(device);
}
}
}

View File

@@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.JedisUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -89,14 +90,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
JedisUtil jedisUtil;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 初始化
*/
@Override
public void updateVmServer(List<MediaServerItem> mediaServerItemList) {
logger.info("[缓存初始化] Media Server ");
logger.info("[zlm] 缓存初始化 ");
for (MediaServerItem mediaServerItem : mediaServerItemList) {
if (StringUtils.isEmpty(mediaServerItem.getId())) {
continue;
@@ -117,8 +116,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck) {
return openRTPServer(mediaServerItem, streamId, null, ssrcCheck,false);
public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck, boolean isPlayback) {
return openRTPServer(mediaServerItem, streamId, null, ssrcCheck,isPlayback);
}
@Override
@@ -231,7 +230,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
result.sort((serverItem1, serverItem2)->{
int sortResult = 0;
try {
sortResult = format.parse(serverItem1.getCreateTime()).compareTo(format.parse(serverItem2.getCreateTime()));
sortResult = DateUtil.format.parse(serverItem1.getCreateTime()).compareTo(DateUtil.format.parse(serverItem2.getCreateTime()));
} catch (ParseException e) {
e.printStackTrace();
}
@@ -291,8 +290,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public WVPResult<String> add(MediaServerItem mediaServerItem) {
WVPResult<String> result = new WVPResult<>();
mediaServerItem.setCreateTime(this.format.format(System.currentTimeMillis()));
mediaServerItem.setUpdateTime(this.format.format(System.currentTimeMillis()));
mediaServerItem.setCreateTime(DateUtil.getNow());
mediaServerItem.setUpdateTime(DateUtil.getNow());
mediaServerItem.setHookAliveInterval(120);
JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
if (responseJSON != null) {
@@ -353,7 +352,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
*/
@Override
public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
logger.info("[ ZLM{} ]-[ {}:{} ]正在连接",
logger.info("[ZLM] 正在连接 : {} -> {}:{}",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
@@ -406,7 +405,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
}
publisher.zlmOnlineEventPublish(serverItem.getId());
logger.info("[ ZLM{} ]-[ {}:{} ]连接成功",
logger.info("[ZLM] 连接成功 {} - {}:{} ",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
}
@@ -484,7 +483,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
*/
@Override
public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) {
logger.info("[ ZLM{} ]-[ {}:{} ]正在设置zlm",
logger.info("[ZLM] 正在设置 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
String protocol = sslEnabled ? "https" : "http";
String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
@@ -528,17 +527,17 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (responseJSON != null && responseJSON.getInteger("code") == 0) {
if (restart) {
logger.info("[ ZLM{} ]-[ {}:{} ]设置zlm成功, 开始重启以保证配置生效",
logger.info("[ZLM] 设置成功,开始重启以保证配置生效 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
zlmresTfulUtils.restartServer(mediaServerItem);
}else {
logger.info("[ ZLM{} ]-[ {}:{} ]设置zlm成功",
logger.info("[ZLM] 设置成功 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
}
}else {
logger.info("[ ZLM{} ]-[ {}:{} ]设置zlm失败",
logger.info("[ZLM] 设置zlm失败 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
}

View File

@@ -13,7 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -190,7 +190,7 @@ public class PlayServiceImpl implements IPlayService {
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck());
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
@@ -234,7 +234,7 @@ public class PlayServiceImpl implements IPlayService {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
if (ssrcInfo == null) {
ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck());
ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
}
// 超时处理
@@ -357,7 +357,7 @@ public class PlayServiceImpl implements IPlayService {
return null;
}
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
}
@@ -444,7 +444,7 @@ public class PlayServiceImpl implements IPlayService {
return null;
}
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
}

View File

@@ -4,20 +4,15 @@ import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
@Component
public class RedisAlarmMsgListener implements MessageListener {
@@ -33,8 +28,6 @@ public class RedisAlarmMsgListener implements MessageListener {
@Autowired
private IVideoManagerStorage storage;
private final SimpleDateFormat formatForGB = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("收到来自REDIS的ALARM通知 {}", new String(message.getBody()));
@@ -52,7 +45,7 @@ public class RedisAlarmMsgListener implements MessageListener {
deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
deviceAlarm.setAlarmPriority("1");
deviceAlarm.setAlarmTime(formatForGB.format(System.currentTimeMillis()));
deviceAlarm.setAlarmTime(DateUtil.getNow());
deviceAlarm.setAlarmType("1");
deviceAlarm.setLongitude(0);
deviceAlarm.setLatitude(0);